You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/09 23:03:39 UTC
svn commit: r703258 - in
/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine:
mapReduceLayer/plans/POPackageAnnotator.java
physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
Author: olga
Date: Thu Oct 9 14:03:39 2008
New Revision: 703258
URL: http://svn.apache.org/viewvc?rev=703258&view=rev
Log:
missing files from path PIG-465
Added:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=703258&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Thu Oct 9 14:03:39 2008
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the MRPlan and does the following
+ * for each MROper
+ * - visits the POPackage in the reduce plan and finds the corresponding
+ * POLocalRearrange(s) (either in the map plan of the same oper OR
+ * reduce plan of predecessor MROper). It then annotates the POPackage
+ * with information about which columns in the "value" are present in the
+ * "key" and will need to stitched in to the "value"
+ */
+public class POPackageAnnotator extends MROpPlanVisitor {
+
+ /**
+ * @param plan MR plan to visit
+ */
+ public POPackageAnnotator(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+
+ // POPackage could be present in the combine plan
+ // OR in the reduce plan. POPostCombinerPackage could
+ // be present only in the reduce plan. Search in these two
+ // plans accordingly
+
+ if(!mr.combinePlan.isEmpty()) {
+ PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan);
+ pkgDiscoverer.visit();
+ POPackage pkg = pkgDiscoverer.getPkg();
+ if(pkg != null) {
+ handlePackage(mr, pkg);
+ }
+ }
+
+ if(!mr.reducePlan.isEmpty()) {
+ PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
+ pkgDiscoverer.visit();
+ POPackage pkg = pkgDiscoverer.getPkg();
+ if(pkg != null) {
+ // if the POPackage is actually a POPostCombinerPackage, then we should
+ // just look for the corresponding LocalRearrange(s) in the combine plan
+ if(pkg instanceof POPostCombinerPackage) {
+ if(!patchPackage(mr.combinePlan, pkg)) {
+ throw new VisitorException("Unexpected problem while trying " +
+ "to optimize (could not find LORearrange in combine plan)");
+ }
+ } else {
+ handlePackage(mr, pkg);
+ }
+ }
+ }
+
+ }
+
+ private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
+ // the LocalRearrange(s) could either be in the map of this MapReduceOper
+ // OR in the reduce of predecessor MapReduceOpers
+ if(!patchPackage(mr.mapPlan, pkg)) {
+ // we did not find the LocalRearrange(s) in the map plan
+ // let's look in the predecessors
+ List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
+ for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) {
+ MapReduceOper mrOper = it.next();
+ if(!patchPackage(mrOper.reducePlan, pkg)) {
+ throw new VisitorException("Unexpected problem while trying " +
+ "to optimize (could not find LORearrange in predecessor's reduce plan)");
+ }
+ }
+ }
+ }
+
+ private boolean patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
+ LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg);
+ lrDiscoverer.visit();
+ // let our caller know if we managed to patch
+ // the package
+ return lrDiscoverer.isLoRearrangeFound();
+ }
+
+ /**
+ * Simple visitor of the "Reduce" physical plan
+ * which will get a reference to the POPacakge
+ * present in the plan
+ */
+ class PackageDiscoverer extends PhyPlanVisitor {
+
+ private POPackage pkg;
+
+ public PackageDiscoverer(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+ */
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException {
+ this.pkg = pkg;
+ };
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
+ */
+ @Override
+ public void visitCombinerPackage(POPostCombinerPackage pkg)
+ throws VisitorException {
+ this.pkg = pkg;
+ }
+
+ /**
+ * @return the pkg
+ */
+ public POPackage getPkg() {
+ return pkg;
+ }
+
+ }
+
+ /**
+ * Physical Plan visitor which tries to get the
+ * LocalRearrange(s) present in the plan (if any) and
+ * annotate the POPackage given to it with the information
+ * in the LocalRearrange (regarding columns in the "value"
+ * present in the "key")
+ */
+ class LoRearrangeDiscoverer extends PhyPlanVisitor {
+
+ private boolean loRearrangeFound = false;
+ private POPackage pkg;
+
+ public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ this.pkg = pkg;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+ */
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
+ loRearrangeFound = true;
+ Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+ // annotate the package with information from the LORearrange
+ // update the keyInfo information if already present in the POPackage
+ keyInfo = pkg.getKeyInfo();
+ if(keyInfo == null)
+ keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+ keyInfo.put(new Integer(lrearrange.getIndex()),
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkg.setKeyInfo(keyInfo);
+ pkg.setKeyTuple(lrearrange.isKeyTuple());
+ }
+
+ /**
+ * @return the loRearrangeFound
+ */
+ public boolean isLoRearrangeFound() {
+ return loRearrangeFound;
+ }
+
+ }
+}
+
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java?rev=703258&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java Thu Oct 9 14:03:39 2008
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The local rearrange operator is a part of the co-group
+ * implementation. It has an embedded physical plan that
+ * generates tuples of the form (grpKey,(indxed inp Tuple)).
+ *
+ */
+public class POLocalRearrangeForIllustrate extends POLocalRearrange {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public POLocalRearrangeForIllustrate(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POLocalRearrangeForIllustrate(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POLocalRearrangeForIllustrate(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ public POLocalRearrangeForIllustrate(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ index = -1;
+ leafOps = new ArrayList<ExpressionOperator>();
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitLocalRearrangeForIllustrate(this);
+ }
+
+ @Override
+ public String name() {
+ return "Local Rearrange For Illustrate" + "[" + DataType.findTypeName(resultType) +
+ "]" + "{" + DataType.findTypeName(keyType) + "}" + "(" +
+ mIsDistinct + ") - " + mKey.toString();
+ }
+
+ protected Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+ //Construct key
+ Object key;
+ if(resLst.size()>1){
+ Tuple t = mTupleFactory.newTuple(resLst.size());
+ int i=-1;
+ for(Result res : resLst)
+ t.set(++i, res.result);
+ key = t;
+ }
+ else{
+ key = resLst.get(0).result;
+ }
+
+ Tuple output = mTupleFactory.newTuple(3);
+ if (mIsDistinct) {
+
+ //Put the key and the indexed tuple
+ //in a tuple and return
+ output.set(0, new Byte((byte)0));
+ output.set(1, key);
+ output.set(2, mFakeTuple);
+ return output;
+ } else {
+ if(isCross){
+ for(int i=0;i<plans.size();i++)
+ value.getAll().remove(0);
+ }
+
+ //Put the index, key, and value
+ //in a tuple and return
+ output.set(0, new Byte(index));
+ output.set(1, key);
+ output.set(2, value);
+ return output;
+ }
+ }
+
+ /**
+ * Make a deep copy of this operator.
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ public POLocalRearrangeForIllustrate clone() throws CloneNotSupportedException {
+ List<PhysicalPlan> clonePlans = new
+ ArrayList<PhysicalPlan>(plans.size());
+ for (PhysicalPlan plan : plans) {
+ clonePlans.add(plan.clone());
+ }
+ POLocalRearrangeForIllustrate clone = new POLocalRearrangeForIllustrate(new OperatorKey(
+ mKey.scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+ requestedParallelism);
+ clone.setPlans(clonePlans);
+ clone.keyType = keyType;
+ clone.index = index;
+ // Needs to be called as setDistinct so that the fake index tuple gets
+ // created.
+ clone.setDistinct(mIsDistinct);
+ return clone;
+ }
+
+}