You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC
svn commit: r1045314 [4/5] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...
Added: pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java (added)
+++ pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The class used to (re)attach illustrators to physical operators
+ *
+ *
+ */
+public class IllustratorAttacher extends PhyPlanVisitor {
+
+ PigContext pigContext;
+
+ LineageTracer lineage;
+
+ HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
+
+ private HashMap<PhysicalOperator, DataBag> poToDataMap;
+ private int maxRecords;
+ private boolean revisit = false;
+ private ArrayList<Boolean[]> subExpResults = null;
+ private final Map<POLoad, Schema> poloadToSchemaMap;
+
+ public IllustratorAttacher(PhysicalPlan plan, LineageTracer lineage, int maxRecords,
+ Map<POLoad, Schema> poLoadToSchemaMap, PigContext hadoopPigContext) throws VisitorException {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ pigContext = hadoopPigContext;
+ this.lineage = lineage;
+ poToEqclassesMap = new HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>>();
+ poToDataMap = new HashMap<PhysicalOperator, DataBag>();
+ this.maxRecords = maxRecords;
+ this.poloadToSchemaMap = poLoadToSchemaMap;
+ }
+
+ /**
+ * revisit an enhanced physical plan from MR compilation
+ * @param plan a physical plan to be traversed
+ */
+ public void revisit(PhysicalPlan plan) throws VisitorException {
+ pushWalker(new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ revisit = true;
+ PhysicalPlan oriPlan = mPlan;
+ mPlan = plan;
+ visit();
+ mPlan = oriPlan;
+ popWalker();
+ }
+
+ private void setIllustrator(PhysicalOperator po, int nEqClasses) {
+ if (revisit && po.getIllustrator() != null)
+ return;
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ poToEqclassesMap.put(po, eqClasses);
+ for (int i = 0; i < nEqClasses; ++i)
+ {
+ IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+ eqClasses.add(eqClass);
+ }
+ Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext);
+ po.setIllustrator(illustrator);
+ poToDataMap.put(po, illustrator.getData());
+ }
+
+ private void setIllustrator(PhysicalOperator po, LinkedList<IdentityHashSet<Tuple>> eqClasses) {
+ if (revisit && po.getIllustrator() != null)
+ return;
+ Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext);
+ po.setIllustrator(illustrator);
+ if (eqClasses != null)
+ poToEqclassesMap.put(po, eqClasses);
+ poToDataMap.put(po, illustrator.getData());
+ }
+
+ void setIllustrator(PhysicalOperator po) {
+ if (revisit && po.getIllustrator() != null)
+ return;
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+ eqClasses.add(eqClass);
+ Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext);
+ po.setIllustrator(illustrator);
+ poToEqclassesMap.put(po, eqClasses);
+ poToDataMap.put(po, illustrator.getData());
+ }
+
+ public Map<PhysicalOperator, DataBag> getDataMap() {
+ return poToDataMap;
+ }
+
+ @Override
+ public void visitLoad(POLoad ld) throws VisitorException{
+ // LOAD from temporary files need no illustrator
+ if (revisit)
+ return;
+
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ poToEqclassesMap.put(ld, eqClasses);
+
+ IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+ eqClasses.add(eqClass);
+ Illustrator illustrator;
+ illustrator = new Illustrator(lineage, eqClasses, maxRecords, this, poloadToSchemaMap.get(ld), pigContext);
+ ld.setIllustrator(illustrator);
+ poToDataMap.put(ld, illustrator.getData());
+ }
+
+ @Override
+ public void visitStore(POStore st) throws VisitorException{
+ setIllustrator(st, 1);
+ }
+
+ @Override
+ public void visitFilter(POFilter fl) throws VisitorException{
+ setIllustrator(fl, 0);
+ subExpResults = fl.getIllustrator().getSubExpResults();
+ innerPlanAttach(fl, fl.getPlan());
+ subExpResults = null;
+ }
+
+ @Override
+ public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
+ setIllustrator(mg, 1);
+ }
+
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+ super.visitLocalRearrange(lr);
+ setIllustrator(lr);
+ }
+
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException{
+ if (!(pkg instanceof POPackageLite) && pkg.isDistinct())
+ setIllustrator(pkg, 1);
+ else
+ setIllustrator(pkg, null);
+ }
+
+ @Override
+ public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
+ setIllustrator(pkg);
+ }
+
+ @Override
+ public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
+ setIllustrator(pkg);
+ }
+
+ @Override
+ public void visitPOForEach(POForEach nfe) throws VisitorException {
+ if (revisit && nfe.getIllustrator() != null)
+ return;
+ List<PhysicalPlan> innerPlans = nfe.getInputPlans();
+ for (PhysicalPlan innerPlan : innerPlans)
+ innerPlanAttach(nfe, innerPlan);
+ List<PhysicalOperator> preds = mPlan.getPredecessors(nfe);
+ if (preds != null && preds.size() == 1 &&
+ preds.get(0) instanceof POPackage &&
+ !(preds.get(0) instanceof POPackageLite) &&
+ ((POPackage) preds.get(0)).isDistinct()) {
+ // equivalence class of POPackage for DISTINCT needs to be used
+ //instead of the succeeding POForEach's equivalence class
+ setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses());
+ nfe.getIllustrator().setEqClassesShared();
+ } else
+ setIllustrator(nfe, 1);
+ }
+
+ @Override
+ public void visitUnion(POUnion un) throws VisitorException{
+ if (revisit && un.getIllustrator() != null)
+ return;
+ setIllustrator(un, null);
+ }
+
+ @Override
+ public void visitSplit(POSplit spl) throws VisitorException{
+ if (revisit && spl.getIllustrator() != null)
+ return;
+ for (PhysicalPlan poPlan : spl.getPlans())
+ innerPlanAttach(spl, poPlan);
+ setIllustrator(spl);
+ }
+
+ @Override
+ public void visitDemux(PODemux demux) throws VisitorException{
+ if (revisit && demux.getIllustrator() != null)
+ return;
+ List<PhysicalPlan> innerPlans = demux.getPlans();
+ for (PhysicalPlan innerPlan : innerPlans)
+ innerPlanAttach(demux, innerPlan);
+ setIllustrator(demux);
+ }
+
+ @Override
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
+ setIllustrator(distinct, 1);
+ }
+
+ @Override
+ public void visitSort(POSort sort) throws VisitorException {
+ setIllustrator(sort, 1);
+ }
+
+ @Override
+ public void visitProject(POProject proj) throws VisitorException{
+ }
+
+ @Override
+ public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
+ setIllustrator(grt, 0);
+ if (!revisit)
+ subExpResults.add(grt.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitLessThan(LessThanExpr lt) throws VisitorException{
+ setIllustrator(lt, 0);
+ if (!revisit)
+ subExpResults.add(lt.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
+ setIllustrator(gte, 0);
+ if (!revisit)
+ subExpResults.add(gte.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
+ setIllustrator(lte, 0);
+ if (!revisit)
+ subExpResults.add(lte.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitEqualTo(EqualToExpr eq) throws VisitorException{
+ setIllustrator(eq, 0);
+ if (!revisit)
+ subExpResults.add(eq.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
+ setIllustrator(eq, 0);
+ if (!revisit)
+ subExpResults.add(eq.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitRegexp(PORegexp re) throws VisitorException{
+ setIllustrator(re, 0);
+ if (!revisit)
+ subExpResults.add(re.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitIsNull(POIsNull isNull) throws VisitorException {
+ setIllustrator(isNull, 0);
+ if (!revisit)
+ subExpResults.add(isNull.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitAnd(POAnd and) throws VisitorException {
+ setIllustrator(and, 0);
+ }
+
+ @Override
+ public void visitOr(POOr or) throws VisitorException {
+ setIllustrator(or, 0);
+ }
+
+ @Override
+ public void visitNot(PONot not) throws VisitorException {
+ setIllustrator(not, 0);
+ if (!revisit)
+ subExpResults.add(not.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitBinCond(POBinCond binCond) {
+ setIllustrator(binCond, 0);
+ if (!revisit)
+ subExpResults.add(binCond.getIllustrator().getSubExpResult());
+ }
+
+ @Override
+ public void visitNegative(PONegative negative) {
+ setIllustrator(negative, 1);
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ }
+
+ @Override
+ public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
+ // one each for >, ==, and <
+ setIllustrator(compFunc, 3);
+ }
+
+ @Override
+ public void visitMapLookUp(POMapLookUp mapLookUp) {
+ setIllustrator(mapLookUp, 1);
+ }
+
+ @Override
+ public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
+ if (revisit && joinPackage.getIllustrator() != null)
+ return;
+ setIllustrator(joinPackage);
+ joinPackage.getForEach().setIllustrator(joinPackage.getIllustrator());
+ }
+
+ @Override
+ public void visitCast(POCast cast) {
+ }
+
+ @Override
+ public void visitLimit(POLimit lim) throws VisitorException {
+ setIllustrator(lim, 1);
+ }
+
+ @Override
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+ // one eq. class per input
+ setIllustrator(join, join.getInputs().size());
+ }
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+ // one eq. class per input
+ setIllustrator(join, join.getInputs().size());
+ }
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
+ // one eq. class per input
+ setIllustrator(mergeCoGrp, mergeCoGrp.getInputs().size());
+ }
+
+ @Override
+ public void visitStream(POStream stream) throws VisitorException {
+ setIllustrator(stream, 1);
+ }
+
+ @Override
+ public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+ // one eq. class per input
+ // do not go to inner plans as they are not booleans so no use of eq. classes
+ setIllustrator(sk, sk.getInputs().size());
+ }
+
+ @Override
+ public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+ }
+
+ /**
+ * @param optimizedForEach
+ */
+ @Override
+ public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
+ visitPOForEach(optimizedForEach);
+ }
+
+ /**
+ * @param preCombinerLocalRearrange
+ */
+ @Override
+ public void visitPreCombinerLocalRearrange(
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ }
+
+ private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException {
+ PlanWalker<PhysicalOperator, PhysicalPlan> childWalker =
+ mCurrentWalker.spawnChildWalker(plan);
+ pushWalker(childWalker);
+ childWalker.walk(this);
+ popWalker();
+ LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ if (subExpResults != null && !revisit) {
+ int size = 1 << subExpResults.size();
+ for (int i = 0; i < size; ++i) {
+ eqClasses.add(new IdentityHashSet<Tuple>());
+ }
+ po.getIllustrator().setEquivalenceClasses(eqClasses, po);
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@
package org.apache.pig.pen;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -27,10 +28,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Comparator;
+import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.BagFactory;
@@ -38,6 +40,7 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LODistinct;
import org.apache.pig.impl.logicalLayer.LOFilter;
@@ -47,34 +50,41 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOSort;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.pen.util.LineageTracer;
import org.apache.pig.pen.util.MetricEvaluation;
import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
+import org.apache.pig.pen.util.ExampleTuple;
public class LineageTrimmingVisitor extends LOVisitor {
LogicalPlan plan = null;
- Map<LOLoad, DataBag> baseData = new HashMap<LOLoad, DataBag>();
+ Map<LOLoad, DataBag> baseData;
+ Map<FileSpec, DataBag> inputToDataMap;
Map<LogicalOperator, PhysicalOperator> LogToPhyMap = null;
PhysicalPlan physPlan = null;
double completeness = 100.0;
Log log = LogFactory.getLog(getClass());
- Map<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>> AffinityGroups = new HashMap<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>>();
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> AffinityGroups = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
Map<LogicalOperator, LineageTracer> Lineage = new HashMap<LogicalOperator, LineageTracer>();
boolean continueTrimming;
PigContext pc;
+ private ExampleGenerator eg;
public LineageTrimmingVisitor(LogicalPlan plan,
Map<LOLoad, DataBag> baseData,
+ ExampleGenerator eg,
Map<LogicalOperator, PhysicalOperator> LogToPhyMap,
- PhysicalPlan physPlan, PigContext pc) {
+ PhysicalPlan physPlan, PigContext pc) throws IOException, InterruptedException {
super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
plan));
// this.baseData.putAll(baseData);
@@ -83,149 +93,66 @@ public class LineageTrimmingVisitor exte
this.LogToPhyMap = LogToPhyMap;
this.pc = pc;
this.physPlan = physPlan;
+ this.eg = eg;
+ this.inputToDataMap = new HashMap<FileSpec, DataBag>();
init();
}
- public void init() {
+ public void init() throws IOException, InterruptedException {
- DerivedDataVisitor visitor = new DerivedDataVisitor(plan, pc, baseData,
- LogToPhyMap, physPlan);
- try {
- visitor.visit();
- } catch (VisitorException e) {
- log.error(e.getMessage());
- }
+ Map<LogicalOperator, DataBag> data = eg.getData();
- LineageTracer lineage = visitor.lineage;
- Lineage.put(plan.getLeaves().get(0), lineage);
- Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = visitor.OpToEqClasses;
- Collection<IdentityHashSet<Tuple>> EqClasses = visitor.EqClasses;
- Map<IdentityHashSet<Tuple>, Integer> affinityGroup = new HashMap<IdentityHashSet<Tuple>, Integer>();
- for (IdentityHashSet<Tuple> set : EqClasses) {
- affinityGroup.put(set, 1);
+ LineageTracer lineage = eg.getLineage();
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = eg.getLoToEqClassMap();
+ for (LogicalOperator leaf : plan.getLeaves()) {
+ Lineage.put(leaf, lineage);
+ AffinityGroups.put(leaf, eg.getEqClasses());
}
- AffinityGroups.put(plan.getLeaves().get(0), affinityGroup);
completeness = MetricEvaluation.getCompleteness(null,
- visitor.derivedData, OpToEqClasses, true);
- LogToPhyMap = visitor.LogToPhyMap;
+ data, OpToEqClasses, true);
+ LogToPhyMap = eg.getLogToPhyMap();
continueTrimming = true;
}
@Override
protected void visit(LOCogroup cg) throws VisitorException {
+ // can't separate CoGroup from succeeding ForEach
+ if (plan.getSuccessors(cg) != null && plan.getSuccessors(cg).get(0) instanceof LOForEach)
+ return;
+
if (continueTrimming) {
- Map<IdentityHashSet<Tuple>, Integer> affinityGroups = null;
-
- continueTrimming = checkCompleteness(cg);
-
- DerivedDataVisitor visitor = null;
- LineageTracer lineage = null;
- // create affinity groups
- if (cg.getInputs().size() == 1) {
- affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
- LogicalOperator childOp = cg.getInputs().get(0);
- visitor = new DerivedDataVisitor(childOp, null, baseData,
- LogToPhyMap, physPlan);
- try {
- visitor.visit();
- } catch (VisitorException e) {
- log.error(e.getMessage());
- }
-
- lineage = visitor.lineage;
-
- DataBag bag = visitor.evaluateIsolatedOperator(cg);
- for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
- DataBag field;
- try {
- field = (DataBag) it.next().get(1);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- log.error(e.getMessage());
- throw new VisitorException(
- "Error trimming operator COGROUP operator "
- + cg.getAlias()
- + "in example generator");
- }
- IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
- affinityGroups.put(set, 2);
- for (Iterator<Tuple> it1 = field.iterator(); it1.hasNext();) {
- set.add(it1.next());
- }
- }
-
- // add the equivalence classes obtained from derived data
- // creation
- for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
- affinityGroups.put(set, 1);
- }
- AffinityGroups.put(cg.getInputs().get(0), affinityGroups);
- Lineage.put(cg.getInputs().get(0), lineage);
+ try {
- } else {
- List<DataBag> inputs = new LinkedList<DataBag>();
- visitor = new DerivedDataVisitor(cg, null, baseData,
- LogToPhyMap, physPlan);
- affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
- for (int i = 0; i < cg.getInputs().size(); i++) {
- // affinityGroups = new HashMap<IdentityHashSet<Tuple>,
- // Integer>();
- LogicalOperator childOp = cg.getInputs().get(i);
- // visitor = new DerivedDataVisitor(cg.getInputs().get(i),
- // null, baseData, LogToPhyMap, physPlan);
- visitor.setOperatorToEvaluate(childOp);
- try {
- visitor.visit();
- } catch (VisitorException e) {
- log.error(e.getMessage());
+ continueTrimming = checkCompleteness(cg);
+
+ LineageTracer lineage = null;
+ // create affinity groups
+ if (cg.getInputs().size() == 1) {
+ lineage = eg.getLineage();
+ AffinityGroups.put(cg.getInputs().get(0), eg.getEqClasses());
+ Lineage.put(cg.getInputs().get(0), lineage);
+
+ } else {
+ for (LogicalOperator input : cg.getInputs()) {
+ Lineage.put(input, eg.getLineage());
+ AffinityGroups.put(input, eg.getEqClasses());
}
- // Lineage.put(childOp, visitor.lineage);
- inputs.add(visitor.derivedData.get(childOp));
-
- for (IdentityHashSet<Tuple> set : visitor.EqClasses)
- affinityGroups.put(set, 1);
-
- // AffinityGroups.put(cg.getInputs().get(i),
- // affinityGroups);
}
- for (LogicalOperator input : cg.getInputs()) {
- Lineage.put(input, visitor.lineage);
- AffinityGroups.put(input, affinityGroups);
- }
-
- visitor = new DerivedDataVisitor(cg, null, baseData,
- LogToPhyMap, physPlan);
- DataBag output = visitor.evaluateIsolatedOperator(cg, inputs);
-
- for (int i = 1; i <= cg.getInputs().size(); i++) {
- affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
- for (Iterator<Tuple> it = output.iterator(); it.hasNext();) {
- DataBag bag = null;
- try {
- bag = (DataBag) it.next().get(i);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- log.error(e.getMessage());
- }
- IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
- affinityGroups.put(set, 1);
- for (Iterator<Tuple> it1 = bag.iterator(); it1
- .hasNext();) {
- set.add(it1.next());
- }
- }
- AffinityGroups.get(cg.getInputs().get(i - 1)).putAll(
- affinityGroups);
-
- }
- AffinityGroups = AffinityGroups;
+ } catch (Exception e) {
+ throw new VisitorException("Exception : "+e.getMessage());
}
}
}
@Override
+ protected void visit(LOJoin join) throws VisitorException {
+ if (continueTrimming) {
+ processOperator(join);
+ }
+ }
+
+ @Override
protected void visit(LOCross cs) throws VisitorException {
if(continueTrimming)
processOperator(cs);
@@ -244,6 +171,12 @@ public class LineageTrimmingVisitor exte
if (continueTrimming)
processOperator(filter);
}
+
+ @Override
+ protected void visit(LOStore store) throws VisitorException {
+ if (continueTrimming)
+ processOperator(store);
+ }
@Override
protected void visit(LOForEach forEach) throws VisitorException {
@@ -286,9 +219,9 @@ public class LineageTrimmingVisitor exte
}
private Map<LOLoad, DataBag> PruneBaseDataConstrainedCoverage(
- Map<LOLoad, DataBag> baseData, DataBag rootOutput,
+ Map<LOLoad, DataBag> baseData,
LineageTracer lineage,
- Map<IdentityHashSet<Tuple>, Integer> equivalenceClasses) {
+ Collection<IdentityHashSet<Tuple>> equivalenceClasses) {
IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage
.getMembershipMap();
@@ -300,10 +233,9 @@ public class LineageTrimmingVisitor exte
// IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new
// IdentityHashMap<Tuple, Set<Integer>>();
IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>>();
- int equivClassId = 0;
- for (IdentityHashSet<Tuple> equivClass : equivalenceClasses.keySet()) {
- for (Tuple t : equivClass) {
- Tuple lineageGroup = lineage.getRepresentative(t);
+ for (IdentityHashSet<Tuple> equivClass : equivalenceClasses) {
+ for (Object t : equivClass) {
+ Tuple lineageGroup = lineage.getRepresentative((Tuple) t);
// Set<Integer> entry =
// lineageGroupToEquivClasses.get(lineageGroup);
Set<IdentityHashSet<Tuple>> entry = lineageGroupToEquivClasses
@@ -316,8 +248,6 @@ public class LineageTrimmingVisitor exte
// entry.add(equivClassId);
entry.add(equivClass);
}
-
- equivClassId++;
}
// select lineage groups such that we cover all equivalence classes
@@ -325,18 +255,19 @@ public class LineageTrimmingVisitor exte
while (!lineageGroupToEquivClasses.isEmpty()) {
// greedily find the lineage group with the best "score", where
// score = # equiv classes covered / group weight
- double bestScore = -1;
+ double bestWeight = -1;
Tuple bestLineageGroup = null;
Set<IdentityHashSet<Tuple>> bestEquivClassesCovered = null;
+ int bestNumEquivClassesCovered = 0;
for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
double weight = lineageGroupWeights.get(lineageGroup);
Set<IdentityHashSet<Tuple>> equivClassesCovered = lineageGroupToEquivClasses
.get(lineageGroup);
int numEquivClassesCovered = equivClassesCovered.size();
- double score = ((double) numEquivClassesCovered) / weight;
- if (score > bestScore) {
+ if ((numEquivClassesCovered > bestNumEquivClassesCovered) ||
+ (numEquivClassesCovered == bestNumEquivClassesCovered && weight < bestWeight)) {
if (selectedLineageGroups.contains(lineageGroup)) {
bestLineageGroup = lineageGroup;
@@ -344,8 +275,9 @@ public class LineageTrimmingVisitor exte
continue;
}
- bestScore = score;
+ bestWeight = weight;
bestLineageGroup = lineageGroup;
+ bestNumEquivClassesCovered = numEquivClassesCovered;
bestEquivClassesCovered = equivClassesCovered;
}
}
@@ -371,10 +303,7 @@ public class LineageTrimmingVisitor exte
.iterator(); it.hasNext();) {
IdentityHashSet<Tuple> equivClass = it.next();
if (bestEquivClassesCovered.contains(equivClass)) {
- if ((equivalenceClasses.get(equivClass) - 1) <= 0) {
- // equivClasses.remove(equivClass);
- it.remove();
- }
+ it.remove();
}
}
if (equivClasses.size() == 0)
@@ -383,11 +312,6 @@ public class LineageTrimmingVisitor exte
}
for (Tuple removeMe : toRemove)
lineageGroupToEquivClasses.remove(removeMe);
-
- for (IdentityHashSet<Tuple> equivClass : bestEquivClassesCovered) {
- equivalenceClasses.put(equivClass, equivalenceClasses
- .get(equivClass) - 1);
- }
}
// revise baseData to only contain the tuples that are part of
@@ -414,65 +338,150 @@ public class LineageTrimmingVisitor exte
return newBaseData;
}
- private void processOperator(LogicalOperator op) {
- if (op instanceof LOLoad) return;
+ private void processLoad(LOLoad ld) throws VisitorException {
+ // prune base records
+ if (inputToDataMap.get(ld.getInputFile()) != null) {
+ baseData.put(ld, inputToDataMap.get(ld.getInputFile()));
+ return;
+ }
- continueTrimming = checkCompleteness(op);
-
- if (continueTrimming == false)
+ DataBag data = baseData.get(ld);
+ if (data == null || data.size() < 2)
return;
+ Set<Tuple> realData = new HashSet<Tuple>(), syntheticData = new HashSet<Tuple>();
- LogicalOperator childOp = plan.getPredecessors(op).get(0);
-
- DerivedDataVisitor visitor = new DerivedDataVisitor(childOp, null,
- baseData, LogToPhyMap, physPlan);
- try {
- visitor.visit();
- } catch (VisitorException e) {
- log.error(e.getMessage());
+ for (Iterator<Tuple> it = data.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ if (((ExampleTuple)t).synthetic)
+ syntheticData.add(t);
+ else
+ realData.add(t);
}
+
+ Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
+ DataBag newData = BagFactory.getInstance().newDefaultBag();
+ newBaseData.put(ld, newData);
+ for (Map.Entry<LOLoad, DataBag> entry : baseData.entrySet()) {
+ if (entry.getKey() != ld) {
+ if (!entry.getKey().getInputFile().equals(ld.getInputFile()))
+ newBaseData.put(entry.getKey(), entry.getValue());
+ else
+ newBaseData.put(entry.getKey(), newData);
+ }
+ }
+
+ if (checkNewBaseData(newData, newBaseData, realData))
+ checkNewBaseData(newData, newBaseData, syntheticData);
+
+ inputToDataMap.put(ld.getInputFile(), baseData.get(ld));
+ }
+
+ private boolean checkNewBaseData(DataBag data, Map<LOLoad, DataBag> newBaseData, Set<Tuple> loadData) throws VisitorException {
+ List<Pair<Tuple, Double>> sortedBase = new LinkedList<Pair<Tuple, Double>>();
+ DataBag oldData = BagFactory.getInstance().newDefaultBag();
+ oldData.addAll(data);
+ double tmpCompleteness = completeness;
+ for (Tuple t : loadData) {
+ data.add(t);
+ // obtain the derived data
+ Map<LogicalOperator, DataBag> derivedData;
+ try {
+ derivedData = eg.getData(newBaseData);
+ } catch (Exception e) {
+ throw new VisitorException("Exception: "+e.getMessage());
+ }
+ double newCompleteness = MetricEvaluation.getCompleteness(null,
+ derivedData, eg.getLoToEqClassMap(), true);
- DataBag bag = visitor.derivedData.get(childOp);
- Map<IdentityHashSet<Tuple>, Integer> affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
-
- for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
- IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
- affinityGroups.put(set, 1);
- set.add(it.next());
+ sortedBase.add(new Pair<Tuple, Double>(t, Double.valueOf(newCompleteness)));
+ if (newCompleteness >= tmpCompleteness)
+ break;
}
+
+ Collections.sort(sortedBase, new Comparator<Pair<Tuple, Double>>() {
+ @Override
+ public int compare(Pair<Tuple, Double> o1,
+ Pair<Tuple, Double> o2) {
+ return o1.second > o2.second ? -1 : o1.second == o2.second ? 0 : 1;
+ }
+ }
+ );
- for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
- // newEquivalenceClasses.put(set, 1);
- affinityGroups.put(set, 1);
+ data.clear();
+ data.addAll(oldData);
+ for (Pair<Tuple, Double> p : sortedBase) {
+ data.add(p.first);
+ // obtain the derived data
+ Map<LogicalOperator, DataBag> derivedData;
+ try {
+ derivedData = eg.getData(newBaseData);
+ } catch (Exception e) {
+ throw new VisitorException("Exception: "+e.getMessage());
+ }
+ double newCompleteness = MetricEvaluation.getCompleteness(null,
+ derivedData, eg.getLoToEqClassMap(), true);
+
+ if (newCompleteness >= completeness) {
+ completeness = newCompleteness;
+ baseData.putAll(newBaseData);
+ return false;
+ }
}
+ return true;
+ }
+
+ private void processOperator(LogicalOperator op) throws VisitorException {
+
+ try {
+ if (op instanceof LOLoad) {
+ processLoad((LOLoad) op);
+ return;
+ }
+
+ continueTrimming = checkCompleteness(op);
- AffinityGroups.put(childOp, affinityGroups);
- Lineage.put(childOp, visitor.lineage);
+ if (plan.getPredecessors(op) == null)
+ return;
+
+ if (continueTrimming == false)
+ return;
+ LogicalOperator childOp = plan.getPredecessors(op).get(0);
+ if (op instanceof LOForEach && childOp instanceof LOCogroup)
+ {
+ LOCogroup cg = (LOCogroup) childOp;
+ for (LogicalOperator input : cg.getInputs()) {
+ AffinityGroups.put(input, eg.getEqClasses());
+ Lineage.put(input, eg.getLineage());
+ }
+ } else {
+ List<LogicalOperator> childOps = plan.getPredecessors(op);
+ for (LogicalOperator lo : childOps) {
+ AffinityGroups.put(lo, eg.getEqClasses());
+ Lineage.put(lo, eg.getLineage());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ throw new VisitorException("Exception: "+e.getMessage());
+ }
}
- private boolean checkCompleteness(LogicalOperator op) {
+ private boolean checkCompleteness(LogicalOperator op) throws Exception, VisitorException {
LineageTracer lineage = Lineage.get(op);
Lineage.remove(op);
- Map<IdentityHashSet<Tuple>, Integer> affinityGroups = AffinityGroups
+ Collection<IdentityHashSet<Tuple>> affinityGroups = AffinityGroups
.get(op);
AffinityGroups.remove(op);
Map<LOLoad, DataBag> newBaseData = PruneBaseDataConstrainedCoverage(
- baseData, null, lineage, affinityGroups);
+ baseData, lineage, affinityGroups);
// obtain the derived data
- DerivedDataVisitor visitor = new DerivedDataVisitor(plan, null,
- newBaseData, LogToPhyMap, physPlan);
- try {
- visitor.visit();
- } catch (VisitorException e) {
- log.error(e.getMessage());
- }
-
+ Map<LogicalOperator, DataBag> derivedData = eg.getData(newBaseData);
double newCompleteness = MetricEvaluation.getCompleteness(null,
- visitor.derivedData, visitor.OpToEqClasses, true);
+ derivedData, eg.getLoToEqClassMap(), true);
if (newCompleteness >= completeness) {
completeness = newCompleteness;
Added: pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java (added)
+++ pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.builtin.ReadScalars;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.PigException;
+
+
+
+/**
+ * Main class that launches pig for Map Reduce
+ *
+ */
+public class LocalMapReduceSimulator {
+
+ private MapReduceLauncher launcher = new MapReduceLauncher();
+
+ private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
+
+ @SuppressWarnings("unchecked")
+ public void launchPig(PhysicalPlan php, Map<LOLoad, DataBag> baseData,
+ Map<PhysicalOperator, LogicalOperator> poLoadToLogMap,
+ LineageTracer lineage,
+ IllustratorAttacher attacher,
+ ExampleGenerator eg,
+ PigContext pc) throws PigException, IOException, InterruptedException {
+ phyToMRMap.clear();
+ MROperPlan mrp = launcher.compile(php, pc);
+
+ HExecutionEngine exe = pc.getExecutionEngine();
+ ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+ Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+
+ JobControl jc;
+ int numMRJobsCompl = 0;
+ DataBag input;
+ List<Pair<PigNullableWritable, Writable>> intermediateData = new ArrayList<Pair<PigNullableWritable, Writable>>();
+
+ Map<Job, MapReduceOper> jobToMroMap = jcc.getJobMroMap();
+ HashMap<String, DataBag> output = new HashMap<String, DataBag>();
+ Configuration jobConf;
+ // jc is null only when mrp.size == 0
+ boolean needFileInput;
+ final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
+ while(mrp.size() != 0) {
+ jc = jcc.compile(mrp, "Illustrator");
+ if(jc == null) {
+ throw new ExecException("Native execution is not supported");
+ }
+ List<Job> jobs = jc.getWaitingJobs();
+ for (Job job : jobs) {
+ jobConf = job.getJobConf();
+ FileLocalizer.setInitialized(false);
+ ArrayList<ArrayList<OperatorKey>> inpTargets =
+ (ArrayList<ArrayList<OperatorKey>>)
+ ObjectSerializer.deserialize(jobConf.get("pig.inpTargets"));
+ intermediateData.clear();
+ MapReduceOper mro = jobToMroMap.get(job);
+ PigSplit split = null;
+ List<POStore> stores = null;
+ PhysicalOperator pack = null;
+ // revisit as there are new physical operators from MR compilation
+ if (!mro.mapPlan.isEmpty())
+ attacher.revisit(mro.mapPlan);
+ if (!mro.reducePlan.isEmpty()) {
+ attacher.revisit(mro.reducePlan);
+ pack = mro.reducePlan.getRoots().get(0);
+ }
+
+ List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+ if (!mro.mapPlan.isEmpty()) {
+ stores = PlanHelper.getStores(mro.mapPlan);
+ }
+ if (!mro.reducePlan.isEmpty()) {
+ if (stores == null)
+ stores = PlanHelper.getStores(mro.reducePlan);
+ else
+ stores.addAll(PlanHelper.getStores(mro.reducePlan));
+ }
+
+ for (POStore store : stores) {
+ output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
+ }
+
+ OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
+ oa.visit();
+
+ if (!mro.reducePlan.isEmpty()) {
+ oa = new OutputAttacher(mro.reducePlan, output);
+ oa.visit();
+ }
+ int index = 0;
+ for (POLoad ld : lds) {
+ input = output.get(ld.getLFile().getFileName());
+ if (input == null && baseData != null) {
+ for (LogicalOperator lo : baseData.keySet()) {
+ if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
+ {
+ input = baseData.get(lo);
+ break;
+ }
+ }
+ }
+ if (input != null)
+ mro.mapPlan.remove(ld);
+ }
+ for (POLoad ld : lds) {
+ // check newly generated data first
+ input = output.get(ld.getLFile().getFileName());
+ if (input == null && baseData != null) {
+ if (input == null && baseData != null) {
+ for (LogicalOperator lo : baseData.keySet()) {
+ if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
+ {
+ input = baseData.get(lo);
+ break;
+ }
+ }
+ }
+ }
+ needFileInput = (input == null);
+ split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
+ ++index;
+ Mapper<Text, Tuple, PigNullableWritable, Writable> map;
+
+ if (mro.reducePlan.isEmpty()) {
+ // map-only
+ map = new PigMapOnly.Map();
+ ((PigMapBase) map).setMapPlan(mro.mapPlan);
+ Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapOnly.Map) map)
+ .getIllustratorContext(jobConf, input, intermediateData, split);
+ map.run(context);
+ } else {
+ if ("true".equals(jobConf.get("pig.usercomparator")))
+ map = new PigMapReduce.MapWithComparator();
+ else if (!"".equals(jobConf.get("pig.keyDistFile", "")))
+ map = new PigMapReduce.MapWithPartitionIndex();
+ else
+ map = new PigMapReduce.Map();
+ Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
+ .getIllustratorContext(jobConf, input, intermediateData, split);
+ ((PigMapBase) map).setMapPlan(mro.mapPlan);
+ map.run(context);
+ }
+ }
+
+ if (!mro.reducePlan.isEmpty())
+ {
+ if (pack instanceof POPackage)
+ mro.reducePlan.remove(pack);
+ // reducer run
+ PigMapReduce.Reduce reduce;
+ if ("true".equals(jobConf.get("pig.usercomparator")))
+ reduce = new PigMapReduce.ReduceWithComparator();
+ else
+ reduce = new PigMapReduce.Reduce();
+ reduce.setReducePlan(mro.reducePlan);
+ Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable>.Context
+ context = reduce.getIllustratorContext(job, intermediateData, (POPackage) pack);
+ reduce.run(context);
+ }
+ phyToMRMap.putAll(mro.phyToMRMap);
+ }
+
+
+ int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
+
+ numMRJobsCompl += removedMROp;
+ }
+
+ jcc.reset();
+ }
+
+ private class OutputAttacher extends PhyPlanVisitor {
+ private Map<String, DataBag> outputBuffer;
+ OutputAttacher(PhysicalPlan plan, Map<String, DataBag> output) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+ plan));
+ this.outputBuffer = output;
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
+ ((ReadScalars) userFunc.getFunc()).setOutputBuffer(outputBuffer);
+ }
+ }
+ }
+ public Map<PhysicalOperator, PhysicalOperator> getPhyToMRMap() {
+ return phyToMRMap;
+ }
+}
Added: pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java (added)
+++ pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,27 @@
+package org.apache.pig.pen;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class POOptimizeDisabler extends LogicalRelationalNodesVisitor {
+
+ public POOptimizeDisabler(OperatorPlan plan) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ }
+
+ @Override
+ public void visit(LOJoin join) throws FrontendException {
+ // use HASH join only
+ join.resetJoinType();
+ }
+
+ @Override
+ public void visit(LOCogroup cg) throws FrontendException {
+ // use regular group only
+ cg.resetGroupType();
+ }
+}
Added: pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java (added)
+++ pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the physical plan and resets it for next MRCompilation
+ */
+public class PhysicalPlanResetter extends PhyPlanVisitor {
+
+ public PhysicalPlanResetter(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException {
+ pkg.setKeyInfo(null);
+ }
+}
Modified: pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java Mon Dec 13 19:11:00 2010
@@ -21,13 +21,18 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.IdentityHashSet;
@@ -62,26 +67,36 @@ public class DisplayExamples {
}
public static String printTabular(LogicalPlan lp,
- Map<LogicalOperator, DataBag> exampleData) {
+ Map<LogicalOperator, DataBag> exampleData,
+ Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap) {
StringBuffer output = new StringBuffer();
-
- LogicalOperator currentOp = lp.getLeaves().get(0);
- printTabular(currentOp, exampleData, output);
+ Set<LogicalOperator> seen = new HashSet<LogicalOperator>();
+ for (LogicalOperator currentOp : lp.getLeaves())
+ printTabular(currentOp, exampleData, forEachInnerLogToDataMap, seen, output);
return output.toString();
}
static void printTabular(LogicalOperator op,
- Map<LogicalOperator, DataBag> exampleData, StringBuffer output) {
- DataBag bag = exampleData.get(op);
+ Map<LogicalOperator, DataBag> exampleData,
+ Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap,
+ Set<LogicalOperator> seen,
+ StringBuffer output) {
List<LogicalOperator> inputs = op.getPlan().getPredecessors(op);
if (inputs != null) { // to avoid an exception when op == LOLoad
for (LogicalOperator Op : inputs) {
- printTabular(Op, exampleData, output);
+ if (!seen.contains(Op))
+ printTabular(Op, exampleData, forEachInnerLogToDataMap, seen, output);
}
}
+ seen.add(op);
+ // print inner block first
+ if ((op instanceof LOForEach)) {
+ printNestedTabular((LOForEach)op, forEachInnerLogToDataMap, exampleData.get(op), output);
+ }
+
if (op.getAlias() != null) {
- // printTable(op, bag, output);
+ DataBag bag = exampleData.get(op);
try {
DisplayTable(MakeArray(op, bag), op, bag, output);
} catch (FrontendException e) {
@@ -95,6 +110,45 @@ public class DisplayExamples {
}
+ // print out nested gen block in ForEach
+ static void printNestedTabular(LOForEach foreach,
+ Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap,
+ DataBag foreachData,
+ StringBuffer output) {
+ List<LogicalPlan> plans = foreach.getForEachPlans();
+ if (plans != null) {
+ for (LogicalPlan plan : plans) {
+ printNestedTabular(plan.getLeaves().get(0), foreach.getAlias(), foreachData, forEachInnerLogToDataMap.get(foreach), output);
+ }
+ }
+ }
+
+ static void printNestedTabular(LogicalOperator lo, String foreachAlias, DataBag foreachData,
+ Map<LogicalOperator, DataBag> logToDataMap, StringBuffer output) {
+
+ List<LogicalOperator> inputs = lo.getPlan().getPredecessors(lo);
+ if (inputs != null) {
+ for (LogicalOperator op : inputs)
+ printNestedTabular(op, foreachAlias, foreachData, logToDataMap, output);
+ }
+
+ DataBag bag = logToDataMap.get(lo);
+ if (bag == null)
+ return;
+
+ if (lo.getAlias() != null) {
+ try {
+ DisplayNestedTable(MakeArray(lo, bag), lo, foreachAlias, foreachData, bag, output);
+ } catch (FrontendException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
public static void printSimple(LogicalOperator op,
Map<LogicalOperator, DataBag> exampleData) {
DataBag bag = exampleData.get(op);
@@ -126,6 +180,9 @@ public class DisplayExamples {
static void DisplayTable(String[][] table, LogicalOperator op, DataBag bag,
StringBuffer output) throws FrontendException {
+ if (op instanceof LOStore && ((LOStore) op).isTmpStore())
+ return;
+
int cols = op.getSchema().getFields().size();
List<FieldSchema> fields = op.getSchema().getFields();
int rows = (int) bag.size();
@@ -136,7 +193,7 @@ public class DisplayExamples {
maxColSizes[i] = 5;
}
int total = 0;
- int aliasLength = op.getAlias().length() + 4;
+ int aliasLength = (op instanceof LOStore ? op.getAlias().length() + 12 : op.getAlias().length() + 4);
for (int j = 0; j < cols; ++j) {
for (int i = 0; i < rows; ++i) {
int length = table[i][j].length();
@@ -146,12 +203,20 @@ public class DisplayExamples {
total += maxColSizes[j];
}
+ // Note of limit reset
+ if (op instanceof LOLimit) {
+ output.append("\nThe limit now in use, " + ((LOLimit)op).getLimit() + ", may have been changed for ILLUSTRATE purpose.\n");
+ }
+
// Display the schema first
output
.append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
false)
+ "\n");
- output.append("| " + op.getAlias() + AddSpaces(4, true) + " | ");
+ if (op instanceof LOStore)
+ output.append("| Store : " + op.getAlias() + AddSpaces(4, true) + " | ");
+ else
+ output.append("| " + op.getAlias() + AddSpaces(4, true) + " | ");
for (int i = 0; i < cols; ++i) {
String field = fields.get(i).toString();
output.append(field
@@ -178,6 +243,61 @@ public class DisplayExamples {
+ "\n");
}
+ static void DisplayNestedTable(String[][] table, LogicalOperator op, String foreachAlias, DataBag bag,
+ DataBag foreachData, StringBuffer output) throws FrontendException {
+ int cols = op.getSchema().getFields().size();
+ List<FieldSchema> fields = op.getSchema().getFields();
+ int rows = (int) bag.size();
+ int[] maxColSizes = new int[cols];
+ for (int i = 0; i < cols; ++i) {
+ maxColSizes[i] = fields.get(i).toString().length();
+ if (maxColSizes[i] < 5)
+ maxColSizes[i] = 5;
+ }
+ int total = 0;
+ int aliasLength = op.getAlias().length() + +foreachAlias.length() + 5;
+ for (int j = 0; j < cols; ++j) {
+ for (int i = 0; i < rows; ++i) {
+ int length = table[i][j].length();
+ if (length > maxColSizes[j])
+ maxColSizes[j] = length;
+ }
+ total += maxColSizes[j];
+ }
+
+ // Display the schema first
+ output
+ .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+ false)
+ + "\n");
+ output.append("| " + foreachAlias + "." + op.getAlias() + AddSpaces(4, true) + " | ");
+ for (int i = 0; i < cols; ++i) {
+ String field;
+ field = fields.get(i).toString();
+ output.append(field
+ + AddSpaces(maxColSizes[i] - field.length(), true) + " | ");
+ }
+ output.append("\n"
+ + AddSpaces(total + 3 * (cols + 1) + aliasLength + 1, false)
+ + "\n");
+ // now start displaying the data
+ for (int i = 0; i < rows; ++i) {
+ output.append("| " + AddSpaces(aliasLength, true) + " | ");
+ for (int j = 0; j < cols; ++j) {
+ String str = table[i][j];
+ output.append(str
+ + AddSpaces(maxColSizes[j] - str.length(), true)
+ + " | ");
+ }
+ output.append("\n");
+ }
+ // now display the finish line
+ output
+ .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+ false)
+ + "\n");
+ }
+
static String[][] MakeArray(LogicalOperator op, DataBag bag)
throws Exception {
int rows = (int) bag.size();
@@ -201,8 +321,12 @@ public class DisplayExamples {
else {
// System.out.println("Unrecognized data-type received!!!");
// return null;
- if (DataType.findTypeName(d) != null)
- return d.toString();
+ if (DataType.findTypeName(d) != null) {
+ if (d == null)
+ return "";
+ else
+ return d.toString();
+ }
}
System.out.println("Unrecognized data-type received!!!");
return null;
Modified: pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java Mon Dec 13 19:11:00 2010
@@ -24,9 +24,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
//Example tuple adds 2 booleans to Tuple
//synthetic say whether the tuple was generated synthetically
@@ -36,17 +34,21 @@ public class ExampleTuple implements Tup
public boolean synthetic = false;
public boolean omittable = true;
- Tuple t;
+ Object expr = null;
+ Tuple t = null;
public ExampleTuple() {
}
+
+ public ExampleTuple(Object expr) {
+ this.expr = expr;
+ }
public ExampleTuple(Tuple t) {
// Have to do it like this because Tuple is an interface, we don't
// have access to its internal structures.
this.t = t;
-
}
@Override
Modified: pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java Mon Dec 13 19:11:00 2010
@@ -20,7 +20,6 @@ package org.apache.pig.pen.util;
import java.util.*;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.util.IdentityHashSet;
public class LineageTracer {
Modified: pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java Mon Dec 13 19:11:00 2010
@@ -19,13 +19,11 @@
package org.apache.pig.pen.util;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.LOFilter;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.util.IdentityHashSet;
@@ -118,24 +116,13 @@ public class MetricEvaluation {
int noClasses = 0;
int noCoveredClasses = 0;
int noOperators = 0;
- Map<Integer, Boolean> coveredClasses;
float completeness = 0;
if (!overallCompleteness) {
Collection<IdentityHashSet<Tuple>> eqClasses = OperatorToEqClasses
.get(op);
- DataBag bag;
- if (op instanceof LOFilter)
- bag = exampleData.get(((LOFilter) op).getInput());
- else
- bag = exampleData.get(op);
- coveredClasses = getCompletenessLogic(bag, eqClasses);
+ noCoveredClasses = getCompletenessLogic(eqClasses);
noClasses = eqClasses.size();
- for (Map.Entry<Integer, Boolean> e : coveredClasses.entrySet()) {
- if (e.getValue()) {
- noCoveredClasses++;
- }
- }
return 100 * ((float) noCoveredClasses) / (float) noClasses;
} else {
@@ -150,20 +137,8 @@ public class MetricEvaluation {
continue; // we want to consider join a single operator
noOperators++;
Collection<IdentityHashSet<Tuple>> eqClasses = e.getValue();
- LogicalOperator lop = e.getKey();
- DataBag bag;
- if (lop instanceof LOFilter)
- bag = exampleData.get(((LOFilter) lop).getInput());
- else
- bag = exampleData.get(lop);
- coveredClasses = getCompletenessLogic(bag, eqClasses);
+ noCoveredClasses = getCompletenessLogic(eqClasses);
noClasses += eqClasses.size();
- for (Map.Entry<Integer, Boolean> e_result : coveredClasses
- .entrySet()) {
- if (e_result.getValue()) {
- noCoveredClasses++;
- }
- }
completeness += 100 * ((float) noCoveredClasses / (float) noClasses);
}
completeness /= (float) noOperators;
@@ -173,23 +148,13 @@ public class MetricEvaluation {
}
- private static Map<Integer, Boolean> getCompletenessLogic(DataBag bag,
+ private static int getCompletenessLogic(
Collection<IdentityHashSet<Tuple>> eqClasses) {
- Map<Integer, Boolean> coveredClasses = new HashMap<Integer, Boolean>();
-
- for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
- Tuple t = it.next();
- int classId = 0;
- for (IdentityHashSet<Tuple> eqClass : eqClasses) {
-
- if (eqClass.contains(t) || eqClass.size() == 0) {
- coveredClasses.put(classId, true);
- }
- classId++;
- }
+ int nCoveredClasses = 0;
+ for (IdentityHashSet<Tuple> eqClass : eqClasses) {
+ if (!eqClass.isEmpty())
+ nCoveredClasses++;
}
-
- return coveredClasses;
-
+ return nCoveredClasses;
}
}
Modified: pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java Mon Dec 13 19:11:00 2010
@@ -33,6 +33,9 @@ import org.apache.pig.impl.util.Utils;
public class PreOrderDepthFirstWalker<O extends Operator, P extends OperatorPlan<O>>
extends PlanWalker<O, P> {
+
+ private boolean branchFlag = false;
+
/**
* @param plan
* Plan for this walker to traverse.
@@ -41,6 +44,14 @@ public class PreOrderDepthFirstWalker<O
super(plan);
}
+ public void setBranchFlag() {
+ branchFlag = true;
+ }
+
+ public boolean getBranchFlag() {
+ return branchFlag;
+ }
+
/**
* Begin traversing the graph.
*
@@ -66,8 +77,10 @@ public class PreOrderDepthFirstWalker<O
if (predecessors == null)
return;
+ boolean thisBranchFlag = branchFlag;
for (O pred : predecessors) {
if (seen.add(pred)) {
+ branchFlag = thisBranchFlag;
pred.visit(visitor);
Collection<O> newPredecessors = Utils.mergeCollection(mPlan.getPredecessors(pred), mPlan.getSoftLinkPredecessors(pred));
depthFirst(pred, newPredecessors, seen, visitor);
Added: pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java (added)
+++ pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+
+@SuppressWarnings("unchecked")
+public class ReverseDepthFirstWalker<O extends Operator, P extends OperatorPlan<O>>
+ extends PlanWalker<O, P> {
+ private HashSet<O> stoppers = null;
+ private LogicalOperator lo;
+ private LogicalPlan lp;
+ Map<LogicalOperator, O> l2pMap;
+ Set<O> seen;
+
+ /**
+ * @param plan
+ * Plan for this walker to traverse.
+ */
+ public ReverseDepthFirstWalker(P plan, LogicalOperator op, LogicalPlan lp, Map<LogicalOperator, O> l2pMap, Set<O> seen) {
+ super(plan);
+ if (lp != null && lp.getPredecessors(op) != null)
+ {
+ stoppers = new HashSet<O>();
+ for (LogicalOperator lo : lp.getPredecessors(op))
+ stoppers.add(l2pMap.get(lo));
+ }
+ lo = op;
+ this.lp = lp;
+ this.l2pMap = l2pMap;
+ if (seen == null)
+ this.seen = new HashSet<O>();
+ else
+ this.seen = seen;
+ }
+
+ public ReverseDepthFirstWalker(P plan, LogicalOperator op, LogicalPlan lp, Map<LogicalOperator, O> l2pMap)
+ {
+ this(plan, op, lp, l2pMap, null);
+ }
+
+ public PlanWalker<O, P> spawnChildWalker(P plan) {
+ return new ReverseDepthFirstWalker<O, P>(plan, lo, lp, l2pMap, seen);
+ }
+
+ /**
+ * Begin traversing the graph.
+ *
+ * @param visitor
+ * Visitor this walker is being used by.
+ * @throws VisitorException
+ * if an error is encountered while walking.
+ */
+ public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+ O po = l2pMap.get(lo);
+ // TODO: After use new LO, this should be removed. Found necessary in the testForeach test.
+ if (po == null)
+ return;
+ depthFirst(null, mPlan.getPredecessors(po), visitor);
+ if (seen.add(po))
+ po.visit(visitor);
+ }
+
+ private void depthFirst(O node, Collection<O> predecessors,
+ PlanVisitor<O, P> visitor) throws VisitorException {
+ if (predecessors == null)
+ return;
+
+ for (O pred : predecessors) {
+ if (seen.add(pred)) {
+ Collection<O> newPredecessors = Utils.mergeCollection(mPlan.getPredecessors(pred), mPlan.getSoftLinkPredecessors(pred));
+ depthFirst(pred, newPredecessors, visitor);
+ pred.visit(visitor);
+ }
+ if (stoppers.contains(pred))
+ continue;
+ }
+ }
+}
Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Dec 13 19:11:00 2010
@@ -96,6 +96,7 @@ public class GruntParser extends PigScri
mDone = false;
mLoadOnly = false;
mExplain = null;
+ mScriptIllustrate = false;
}
private void setBatchOn() {
@@ -202,6 +203,10 @@ public class GruntParser extends PigScri
mJobConf = execEngine.getJobConf();
}
+ public void setScriptIllustrate() {
+ mScriptIllustrate = true;
+ }
+
@Override
public void prompt()
{
@@ -271,7 +276,7 @@ public class GruntParser extends PigScri
}
setBatchOn();
try {
- loadScript(script, true, true, params, files);
+ loadScript(script, true, true, false, params, files);
} catch(IOException e) {
discardBatch();
throw e;
@@ -416,20 +421,20 @@ public class GruntParser extends PigScri
setBatchOn();
mPigServer.setJobName(script);
try {
- loadScript(script, true, mLoadOnly, params, files);
+ loadScript(script, true, false, mLoadOnly, params, files);
executeBatch();
} finally {
discardBatch();
}
} else {
- loadScript(script, false, mLoadOnly, params, files);
+ loadScript(script, false, false, mLoadOnly, params, files);
}
} else {
log.warn("'run/exec' statement is ignored while processing 'explain -script' or '-check'");
}
}
- private void loadScript(String script, boolean batch, boolean loadOnly,
+ private void loadScript(String script, boolean batch, boolean loadOnly, boolean illustrate,
List<String> params, List<String> files)
throws IOException, ParseException {
@@ -467,6 +472,8 @@ public class GruntParser extends PigScri
parser.setConsoleReader(reader);
parser.setInteractive(interactive);
parser.setLoadOnly(loadOnly);
+ if (illustrate)
+ parser.setScriptIllustrate();
parser.mExplain = mExplain;
parser.prompt();
@@ -621,12 +628,43 @@ public class GruntParser extends PigScri
}
@Override
- protected void processIllustrate(String alias) throws IOException
+ protected void processIllustrate(String alias, String script, String target, List<String> params, List<String> files) throws IOException, ParseException
{
- if(mExplain == null) { // process only if not in "explain" mode
- mPigServer.getExamples(alias);
- } else {
+ if (mScriptIllustrate)
+ throw new ParseException("'illustrate' statement can not appear in a script that is illustrated opon.");
+
+ if ((alias != null) && (script != null))
+ throw new ParseException("'illustrate' statement on an alias does not work when a script is in effect");
+ else if (mExplain != null)
log.warn("'illustrate' statement is ignored while processing 'explain -script' or '-check'");
+ else {
+ try {
+ if (script != null) {
+ if (!"true".equalsIgnoreCase(mPigServer.
+ getPigContext()
+ .getProperties().
+ getProperty("opt.multiquery","true"))) {
+ throw new ParseException("Cannot explain script if multiquery is disabled.");
+ }
+ setBatchOn();
+ try {
+ loadScript(script, true, true, true, params, files);
+ } catch(IOException e) {
+ discardBatch();
+ throw e;
+ } catch (ParseException e) {
+ discardBatch();
+ throw e;
+ }
+ } else if (alias == null) {
+ throw new ParseException("'illustrate' statement must be on an alias or on a script.");
+ }
+ mPigServer.getExamples(alias);
+ } finally {
+ if (script != null) {
+ discardBatch();
+ }
+ }
}
}
@@ -998,4 +1036,5 @@ public class GruntParser extends PigScri
private int mNumFailedJobs;
private int mNumSucceededJobs;
private FsShell shell;
+ private boolean mScriptIllustrate;
}