You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/09 23:03:39 UTC

svn commit: r703258 - in /incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine: mapReduceLayer/plans/POPackageAnnotator.java physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java

Author: olga
Date: Thu Oct  9 14:03:39 2008
New Revision: 703258

URL: http://svn.apache.org/viewvc?rev=703258&view=rev
Log:
missing files from path PIG-465

Added:
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java

Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=703258&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Thu Oct  9 14:03:39 2008
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the MRPlan and does the following
+ * for each MROper
+ *  - visits the POPackage in the reduce plan and finds the corresponding
+ *  POLocalRearrange(s) (either in the map plan of the same oper OR
+ *  reduce plan of predecessor MROper). It then annotates the POPackage
+ *  with information about which columns in the "value" are present in the
+ *  "key" and will need to stitched in to the "value"
+ */
+public class POPackageAnnotator extends MROpPlanVisitor {
+
+    /**
+     * @param plan MR plan to visit
+     */
+    public POPackageAnnotator(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        
+        // POPackage could be present in the combine plan
+        // OR in the reduce plan. POPostCombinerPackage could
+        // be present only in the reduce plan. Search in these two
+        // plans accordingly
+        
+        if(!mr.combinePlan.isEmpty()) {
+            PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan);
+            pkgDiscoverer.visit();
+            POPackage pkg = pkgDiscoverer.getPkg();
+            if(pkg != null) {
+                handlePackage(mr, pkg);
+            }   
+        }
+        
+        if(!mr.reducePlan.isEmpty()) {
+            PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
+            pkgDiscoverer.visit();
+            POPackage pkg = pkgDiscoverer.getPkg();
+            if(pkg != null) {
+                // if the POPackage is actually a POPostCombinerPackage, then we should
+                // just look for the corresponding LocalRearrange(s) in the combine plan
+                if(pkg instanceof POPostCombinerPackage) {
+                    if(!patchPackage(mr.combinePlan, pkg)) {
+                        throw new VisitorException("Unexpected problem while trying " +
+                        		"to optimize (could not find LORearrange in combine plan)");
+                    }
+                } else {
+                    handlePackage(mr, pkg);
+                }
+            }
+        }
+        
+    }
+    
+    private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
+        // the LocalRearrange(s) could either be in the map of this MapReduceOper
+        // OR in the reduce of predecessor MapReduceOpers
+        if(!patchPackage(mr.mapPlan, pkg)) {
+            // we did not find the LocalRearrange(s) in the map plan
+            // let's look in the predecessors
+            List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
+            for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) {
+                MapReduceOper mrOper = it.next();
+                if(!patchPackage(mrOper.reducePlan, pkg)) {
+                    throw new VisitorException("Unexpected problem while trying " +
+                            "to optimize (could not find LORearrange in predecessor's reduce plan)");
+                }     
+            }
+        }
+    }
+
+    private boolean patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg);
+        lrDiscoverer.visit();
+        // let our caller know if we managed to patch
+        // the package
+        return lrDiscoverer.isLoRearrangeFound();
+    }
+    
+    /**
+     * Simple visitor of the "Reduce" physical plan
+     * which will get a reference to the POPacakge
+     * present in the plan
+     */
+    class PackageDiscoverer extends PhyPlanVisitor {
+
+        private POPackage pkg;
+        
+        public PackageDiscoverer(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+         */
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            this.pkg = pkg;
+        };
+        
+        /* (non-Javadoc)
+         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
+         */
+        @Override
+        public void visitCombinerPackage(POPostCombinerPackage pkg)
+                throws VisitorException {
+            this.pkg = pkg;
+        }
+
+        /**
+         * @return the pkg
+         */
+        public POPackage getPkg() {
+            return pkg;
+        }
+        
+    }
+    
+    /**
+     * Physical Plan visitor which tries to get the
+     * LocalRearrange(s) present in the plan (if any) and
+     * annotate the POPackage given to it with the information
+     * in the LocalRearrange (regarding columns in the "value"
+     * present in the "key")
+     */
+    class LoRearrangeDiscoverer extends PhyPlanVisitor {
+        
+        private boolean loRearrangeFound = false;
+        private POPackage pkg;
+        
+        public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.pkg = pkg;
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+         */
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
+            loRearrangeFound = true;
+            Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+            // annotate the package with information from the LORearrange
+            // update the keyInfo information if already present in the POPackage
+            keyInfo = pkg.getKeyInfo();
+            if(keyInfo == null)
+                keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+                
+            keyInfo.put(new Integer(lrearrange.getIndex()), 
+                new Pair<Boolean, Map<Integer, Integer>>(
+                        lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+            pkg.setKeyInfo(keyInfo);
+            pkg.setKeyTuple(lrearrange.isKeyTuple());
+        }
+
+        /**
+         * @return the loRearrangeFound
+         */
+        public boolean isLoRearrangeFound() {
+            return loRearrangeFound;
+        }
+
+    }
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java?rev=703258&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java Thu Oct  9 14:03:39 2008
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The local rearrange operator is a part of the co-group
+ * implementation. It has an embedded physical plan that
+ * generates tuples of the form (grpKey,(indxed inp Tuple)).
+ *
+ */
+public class POLocalRearrangeForIllustrate extends POLocalRearrange {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public POLocalRearrangeForIllustrate(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POLocalRearrangeForIllustrate(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POLocalRearrangeForIllustrate(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    public POLocalRearrangeForIllustrate(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        index = -1;
+        leafOps = new ArrayList<ExpressionOperator>();
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitLocalRearrangeForIllustrate(this);
+    }
+
+    @Override
+    public String name() {
+        return "Local Rearrange For Illustrate" + "[" + DataType.findTypeName(resultType) +
+            "]" + "{" + DataType.findTypeName(keyType) + "}" + "(" +
+            mIsDistinct + ") - " + mKey.toString();
+    }
+
+    protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+        //Construct key
+        Object key;
+        if(resLst.size()>1){
+            Tuple t = mTupleFactory.newTuple(resLst.size());
+            int i=-1;
+            for(Result res : resLst)
+                t.set(++i, res.result);
+            key = t;
+        }
+        else{
+            key = resLst.get(0).result;
+        }
+        
+        Tuple output = mTupleFactory.newTuple(3);
+        if (mIsDistinct) {
+
+            //Put the key and the indexed tuple
+            //in a tuple and return
+            output.set(0, new Byte((byte)0));
+            output.set(1, key);
+            output.set(2, mFakeTuple);
+            return output;
+        } else {
+            if(isCross){
+                for(int i=0;i<plans.size();i++)
+                    value.getAll().remove(0);
+            }
+
+            //Put the index, key, and value
+            //in a tuple and return
+            output.set(0, new Byte(index));
+            output.set(1, key);
+            output.set(2, value);
+            return output;
+        }
+    }
+
+    /**
+     * Make a deep copy of this operator.  
+     * @throws CloneNotSupportedException
+     */
+    @Override
+    public POLocalRearrangeForIllustrate clone() throws CloneNotSupportedException {
+        List<PhysicalPlan> clonePlans = new
+            ArrayList<PhysicalPlan>(plans.size());
+        for (PhysicalPlan plan : plans) {
+            clonePlans.add(plan.clone());
+        }
+        POLocalRearrangeForIllustrate clone = new POLocalRearrangeForIllustrate(new OperatorKey(
+            mKey.scope, 
+            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+            requestedParallelism);
+        clone.setPlans(clonePlans);
+        clone.keyType = keyType;
+        clone.index = index;
+        // Needs to be called as setDistinct so that the fake index tuple gets
+        // created.
+        clone.setDistinct(mIsDistinct);
+        return clone;
+    }
+
+}