You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/26 23:23:35 UTC
svn commit: r699505 [2/2] - in
/incubator/pig/branches/types/src/org/apache/pig/pen: ./ util/
Added: incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LODistinct;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.util.MetricEvaluation;
+import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
+
+public class LineageTrimmingVisitor extends LOVisitor {
+
+ LogicalPlan plan = null;
+ Map<LOLoad, DataBag> baseData = new HashMap<LOLoad, DataBag>();
+ Map<LogicalOperator, PhysicalOperator> LogToPhyMap = null;
+ PhysicalPlan physPlan = null;
+ double completeness = 100.0;
+ Log log = LogFactory.getLog(getClass());
+
+ Map<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>> AffinityGroups = new HashMap<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>>();
+ Map<LogicalOperator, LineageTracer> Lineage = new HashMap<LogicalOperator, LineageTracer>();
+
+ boolean continueTrimming;
+ PigContext pc;
+
+ public LineageTrimmingVisitor(LogicalPlan plan,
+ Map<LOLoad, DataBag> baseData,
+ Map<LogicalOperator, PhysicalOperator> LogToPhyMap,
+ PhysicalPlan physPlan, PigContext pc) {
+ super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
+ plan));
+ // this.baseData.putAll(baseData);
+ this.baseData = baseData;
+ this.plan = plan;
+ this.LogToPhyMap = LogToPhyMap;
+ this.pc = pc;
+ this.physPlan = physPlan;
+ init();
+ }
+
+ public void init() {
+
+ DerivedDataVisitor visitor = new DerivedDataVisitor(plan, pc, baseData,
+ LogToPhyMap, physPlan);
+ try {
+ visitor.visit();
+ } catch (VisitorException e) {
+ log.error(e.getMessage());
+ }
+
+ LineageTracer lineage = visitor.lineage;
+ Lineage.put(plan.getLeaves().get(0), lineage);
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = visitor.OpToEqClasses;
+ Collection<IdentityHashSet<Tuple>> EqClasses = visitor.EqClasses;
+ Map<IdentityHashSet<Tuple>, Integer> affinityGroup = new HashMap<IdentityHashSet<Tuple>, Integer>();
+ for (IdentityHashSet<Tuple> set : EqClasses) {
+ affinityGroup.put(set, 1);
+ }
+ AffinityGroups.put(plan.getLeaves().get(0), affinityGroup);
+ completeness = MetricEvaluation.getCompleteness(null,
+ visitor.derivedData, OpToEqClasses, true);
+ LogToPhyMap = visitor.LogToPhyMap;
+ continueTrimming = true;
+
+ }
+
+ @Override
+ protected void visit(LOCogroup cg) throws VisitorException {
+ if (continueTrimming) {
+ Map<IdentityHashSet<Tuple>, Integer> affinityGroups = null;
+
+ continueTrimming = checkCompleteness(cg);
+
+ DerivedDataVisitor visitor = null;
+ LineageTracer lineage = null;
+ // create affinity groups
+ if (cg.getInputs().size() == 1) {
+ affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+ LogicalOperator childOp = cg.getInputs().get(0);
+ visitor = new DerivedDataVisitor(childOp, null, baseData,
+ LogToPhyMap, physPlan);
+ try {
+ visitor.visit();
+ } catch (VisitorException e) {
+ log.error(e.getMessage());
+ }
+
+ lineage = visitor.lineage;
+
+ DataBag bag = visitor.evaluateIsolatedOperator(cg);
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ DataBag field;
+ try {
+ field = (DataBag) it.next().get(1);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ log.error(e.getMessage());
+ throw new VisitorException(
+ "Error trimming operator COGROUP operator "
+ + cg.getAlias()
+ + "in example generator");
+ }
+ IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
+ affinityGroups.put(set, 2);
+ for (Iterator<Tuple> it1 = field.iterator(); it1.hasNext();) {
+ set.add(it1.next());
+ }
+ }
+
+ // add the equivalence classes obtained from derived data
+ // creation
+ for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
+ affinityGroups.put(set, 1);
+ }
+ AffinityGroups.put(cg.getInputs().get(0), affinityGroups);
+ Lineage.put(cg.getInputs().get(0), lineage);
+
+ } else {
+ List<DataBag> inputs = new LinkedList<DataBag>();
+ visitor = new DerivedDataVisitor(cg, null, baseData,
+ LogToPhyMap, physPlan);
+ affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+ for (int i = 0; i < cg.getInputs().size(); i++) {
+ // affinityGroups = new HashMap<IdentityHashSet<Tuple>,
+ // Integer>();
+ LogicalOperator childOp = cg.getInputs().get(i);
+ // visitor = new DerivedDataVisitor(cg.getInputs().get(i),
+ // null, baseData, LogToPhyMap, physPlan);
+ visitor.setOperatorToEvaluate(childOp);
+ try {
+ visitor.visit();
+ } catch (VisitorException e) {
+ log.error(e.getMessage());
+ }
+ // Lineage.put(childOp, visitor.lineage);
+ inputs.add(visitor.derivedData.get(childOp));
+
+ for (IdentityHashSet<Tuple> set : visitor.EqClasses)
+ affinityGroups.put(set, 1);
+
+ // AffinityGroups.put(cg.getInputs().get(i),
+ // affinityGroups);
+ }
+ for (LogicalOperator input : cg.getInputs()) {
+ Lineage.put(input, visitor.lineage);
+ AffinityGroups.put(input, affinityGroups);
+ }
+
+ visitor = new DerivedDataVisitor(cg, null, baseData,
+ LogToPhyMap, physPlan);
+ DataBag output = visitor.evaluateIsolatedOperator(cg, inputs);
+
+ for (int i = 1; i <= cg.getInputs().size(); i++) {
+ affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+ for (Iterator<Tuple> it = output.iterator(); it.hasNext();) {
+ DataBag bag = null;
+ try {
+ bag = (DataBag) it.next().get(i);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ log.error(e.getMessage());
+ }
+ IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
+ affinityGroups.put(set, 1);
+ for (Iterator<Tuple> it1 = bag.iterator(); it1
+ .hasNext();) {
+ set.add(it1.next());
+ }
+ }
+ AffinityGroups.get(cg.getInputs().get(i - 1)).putAll(
+ affinityGroups);
+
+ }
+ AffinityGroups = AffinityGroups;
+ }
+ }
+ }
+
+ @Override
+ protected void visit(LOCross cs) throws VisitorException {
+ if(continueTrimming)
+ processOperator(cs);
+
+ }
+
+ @Override
+ protected void visit(LODistinct dt) throws VisitorException {
+ if(continueTrimming)
+ processOperator(dt);
+
+ }
+
+ @Override
+ protected void visit(LOFilter filter) throws VisitorException {
+ if (continueTrimming)
+ processOperator(filter);
+ }
+
+ @Override
+ protected void visit(LOForEach forEach) throws VisitorException {
+ if (continueTrimming)
+ processOperator(forEach);
+ }
+
+ @Override
+ protected void visit(LOLimit limOp) throws VisitorException {
+ if(continueTrimming)
+ processOperator(limOp);
+
+ }
+
+ @Override
+ protected void visit(LOLoad load) throws VisitorException {
+ if (continueTrimming)
+ processOperator(load);
+ }
+
+ @Override
+ protected void visit(LOSort s) throws VisitorException {
+ if(continueTrimming)
+ processOperator(s);
+
+ }
+
+ @Override
+ protected void visit(LOSplit split) throws VisitorException {
+ if(continueTrimming)
+ processOperator(split);
+
+ }
+
+ @Override
+ protected void visit(LOUnion u) throws VisitorException {
+ if(continueTrimming)
+ processOperator(u);
+
+ }
+
+ private Map<LOLoad, DataBag> PruneBaseDataConstrainedCoverage(
+ Map<LOLoad, DataBag> baseData, DataBag rootOutput,
+ LineageTracer lineage,
+ Map<IdentityHashSet<Tuple>, Integer> equivalenceClasses) {
+
+ IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage
+ .getMembershipMap();
+ IdentityHashMap<Tuple, Double> lineageGroupWeights = lineage
+ .getWeightedCounts(2f, 1);
+
+ // compute a mapping from lineage group to the set of equivalence
+ // classes covered by it
+ // IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new
+ // IdentityHashMap<Tuple, Set<Integer>>();
+ IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>>();
+ int equivClassId = 0;
+ for (IdentityHashSet<Tuple> equivClass : equivalenceClasses.keySet()) {
+ for (Tuple t : equivClass) {
+ Tuple lineageGroup = lineage.getRepresentative(t);
+ // Set<Integer> entry =
+ // lineageGroupToEquivClasses.get(lineageGroup);
+ Set<IdentityHashSet<Tuple>> entry = lineageGroupToEquivClasses
+ .get(lineageGroup);
+ if (entry == null) {
+ // entry = new HashSet<Integer>();
+ entry = new HashSet<IdentityHashSet<Tuple>>();
+ lineageGroupToEquivClasses.put(lineageGroup, entry);
+ }
+ // entry.add(equivClassId);
+ entry.add(equivClass);
+ }
+
+ equivClassId++;
+ }
+
+ // select lineage groups such that we cover all equivalence classes
+ IdentityHashSet<Tuple> selectedLineageGroups = new IdentityHashSet<Tuple>();
+ while (!lineageGroupToEquivClasses.isEmpty()) {
+ // greedily find the lineage group with the best "score", where
+ // score = # equiv classes covered / group weight
+ double bestScore = -1;
+ Tuple bestLineageGroup = null;
+ Set<IdentityHashSet<Tuple>> bestEquivClassesCovered = null;
+ for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
+ double weight = lineageGroupWeights.get(lineageGroup);
+
+ Set<IdentityHashSet<Tuple>> equivClassesCovered = lineageGroupToEquivClasses
+ .get(lineageGroup);
+ int numEquivClassesCovered = equivClassesCovered.size();
+ double score = ((double) numEquivClassesCovered)
+ / ((double) weight);
+
+ if (score > bestScore) {
+
+ if (selectedLineageGroups.contains(lineageGroup)) {
+ bestLineageGroup = lineageGroup;
+ bestEquivClassesCovered = equivClassesCovered;
+ continue;
+ }
+
+ bestScore = score;
+ bestLineageGroup = lineageGroup;
+ bestEquivClassesCovered = equivClassesCovered;
+ }
+ }
+ // add the best-scoring lineage group to the set of ones we plan to
+ // retain
+ selectedLineageGroups.add(bestLineageGroup);
+
+ // make copy of bestEquivClassesCovered (or else the code that
+ // follows won't work correctly, because removing from the reference
+ // set)
+ Set<IdentityHashSet<Tuple>> toCopy = bestEquivClassesCovered;
+ bestEquivClassesCovered = new HashSet<IdentityHashSet<Tuple>>();
+ bestEquivClassesCovered.addAll(toCopy);
+
+ // remove the classes we've now covered
+ Collection<Tuple> toRemove = new LinkedList<Tuple>();
+ for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
+
+ Set<IdentityHashSet<Tuple>> equivClasses = lineageGroupToEquivClasses
+ .get(lineageGroup);
+
+ for (Iterator<IdentityHashSet<Tuple>> it = equivClasses
+ .iterator(); it.hasNext();) {
+ IdentityHashSet<Tuple> equivClass = it.next();
+ if (bestEquivClassesCovered.contains(equivClass)) {
+ if ((equivalenceClasses.get(equivClass) - 1) <= 0) {
+ // equivClasses.remove(equivClass);
+ it.remove();
+ }
+ }
+ }
+ if (equivClasses.size() == 0)
+ toRemove.add(lineageGroup);
+
+ }
+ for (Tuple removeMe : toRemove)
+ lineageGroupToEquivClasses.remove(removeMe);
+
+ for (IdentityHashSet<Tuple> equivClass : bestEquivClassesCovered) {
+ equivalenceClasses.put(equivClass, equivalenceClasses
+ .get(equivClass) - 1);
+ }
+ }
+
+ // revise baseData to only contain the tuples that are part of
+ // selectedLineageGroups
+ IdentityHashSet<Tuple> tuplesToRetain = new IdentityHashSet<Tuple>();
+ for (Tuple lineageGroup : selectedLineageGroups) {
+ Collection<Tuple> members = membershipMap.get(lineageGroup);
+ for (Tuple t : members)
+ tuplesToRetain.add(t);
+ }
+ Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
+ for (LOLoad loadOp : baseData.keySet()) {
+ DataBag data = baseData.get(loadOp);
+ // DataBag newData = new DataBag();
+ DataBag newData = BagFactory.getInstance().newDefaultBag();
+ for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ if (tuplesToRetain.contains(t))
+ newData.add(t);
+ }
+ newBaseData.put(loadOp, newData);
+ }
+
+ return newBaseData;
+ }
+
+ private void processOperator(LogicalOperator op) {
+ continueTrimming = checkCompleteness(op);
+
+ if (op instanceof LOLoad || continueTrimming == false)
+ return;
+
+ LogicalOperator childOp = plan.getPredecessors(op).get(0);
+
+ DerivedDataVisitor visitor = new DerivedDataVisitor(childOp, null,
+ baseData, LogToPhyMap, physPlan);
+ try {
+ visitor.visit();
+ } catch (VisitorException e) {
+ log.error(e.getMessage());
+ }
+
+ DataBag bag = visitor.derivedData.get(childOp);
+ Map<IdentityHashSet<Tuple>, Integer> affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
+ affinityGroups.put(set, 1);
+ set.add(it.next());
+ }
+
+ for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
+ // newEquivalenceClasses.put(set, 1);
+ affinityGroups.put(set, 1);
+ }
+
+ AffinityGroups.put(childOp, affinityGroups);
+ Lineage.put(childOp, visitor.lineage);
+
+ }
+
+ private boolean checkCompleteness(LogicalOperator op) {
+ LineageTracer lineage = Lineage.get(op);
+ Lineage.remove(op);
+
+ Map<IdentityHashSet<Tuple>, Integer> affinityGroups = AffinityGroups
+ .get(op);
+ AffinityGroups.remove(op);
+
+ Map<LOLoad, DataBag> newBaseData = PruneBaseDataConstrainedCoverage(
+ baseData, null, lineage, affinityGroups);
+
+ // obtain the derived data
+ DerivedDataVisitor visitor = new DerivedDataVisitor(plan, null,
+ newBaseData, LogToPhyMap, physPlan);
+ try {
+ visitor.visit();
+ } catch (VisitorException e) {
+ log.error(e.getMessage());
+ }
+
+ double newCompleteness = MetricEvaluation.getCompleteness(null,
+ visitor.derivedData, visitor.OpToEqClasses, true);
+
+ if (newCompleteness >= completeness) {
+ completeness = newCompleteness;
+ baseData.putAll(newBaseData);
+ } else {
+ continueTrimming = false;
+ }
+
+ return continueTrimming;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+//This modifies the dependencyOrderWalker by limiting the walking to the predecessors of a given operator
+public class DependencyOrderLimitedWalker<O extends Operator, P extends OperatorPlan<O>>
+ extends DependencyOrderWalker<O, P> {
+
+ private O operator;
+
+ public DependencyOrderLimitedWalker(O operator, P plan) {
+ super(plan);
+ this.operator = operator;
+ }
+
+ @Override
+ public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+ List<O> fifo = new ArrayList<O>();
+
+ Set<O> seen = new HashSet<O>();
+ doAllPredecessors(operator, seen, fifo);
+
+ for (O op : fifo) {
+ op.visit(visitor);
+ }
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+//Class containing some generic printing methods to print example data in a simple/tabular form
+public class DisplayExamples {
+
+ public static StringBuffer Result = new StringBuffer();
+ public static final int MAX_DATAATOM_LENGTH = 25;
+
+ static void PrintMetrics(
+ LogicalOperator op,
+ Map<LogicalOperator, DataBag> derivedData,
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses) {
+ /*
+ * System.out.println("Realness : " + Metrics.getRealness(op,
+ * derivedData, true)); System.out.println("Completeness : " +
+ * Metrics.getCompleteness(op, derivedData, OperatorToEqClasses, true));
+ * System.out.println("Consiseness : " + Metrics.getConciseness(op,
+ * derivedData, OperatorToEqClasses, true));
+ */
+ System.out.println("Realness : "
+ + MetricEvaluation.getRealness(op, derivedData, true)
+ + "\n"
+ + "Conciseness : "
+ + MetricEvaluation.getConciseness(op, derivedData,
+ OperatorToEqClasses, true)
+ + "\n"
+ + "Completeness : "
+ + MetricEvaluation.getCompleteness(op, derivedData,
+ OperatorToEqClasses, true) + "\n");
+ }
+
+ public static String PrintTabular(LogicalPlan lp,
+ Map<LogicalOperator, DataBag> exampleData) {
+ StringBuffer output = new StringBuffer();
+
+ LogicalOperator currentOp = lp.getLeaves().get(0);
+ PrintTabular(currentOp, exampleData, output);
+ return output.toString();
+ }
+
+ static void PrintTabular(LogicalOperator op,
+ Map<LogicalOperator, DataBag> exampleData, StringBuffer output) {
+ DataBag bag = exampleData.get(op);
+
+ List<LogicalOperator> inputs = op.getPlan().getPredecessors(op);
+ if (inputs != null) { // to avoid an exception when op == LOLoad
+ for (LogicalOperator Op : inputs) {
+ PrintTabular(Op, exampleData, output);
+ }
+ }
+ if (op.getAlias() != null) {
+ // PrintTable(op, bag, output);
+ try {
+ DisplayTable(MakeArray(op, bag), op, bag, output);
+ } catch (FrontendException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ public static void PrintSimple(LogicalOperator op,
+ Map<LogicalOperator, DataBag> exampleData) {
+ DataBag bag = exampleData.get(op);
+
+ List<LogicalOperator> inputs = op.getPlan().getPredecessors(op);
+ if (inputs != null) {
+ for (LogicalOperator lOp : inputs) {
+ PrintSimple(lOp, exampleData);
+ }
+ }
+ if (op.getAlias() != null) {
+ // PrintTable(op, bag, output);
+ // DisplayTable(MakeArray(op, bag), op, bag, output);
+ System.out.println(op.getAlias() + " : " + bag);
+ }
+ // System.out.println(op.getAlias() + " : " + bag);
+ }
+
+ static String AddSpaces(int n, boolean printSpace) {
+ StringBuffer str = new StringBuffer();
+ for (int i = 0; i < n; ++i) {
+ if (printSpace)
+ str.append(" ");
+ else
+ str.append("-");
+ }
+ return str.toString();
+ }
+
+ static void DisplayTable(String[][] table, LogicalOperator op, DataBag bag,
+ StringBuffer output) throws FrontendException {
+ int cols = op.getSchema().getFields().size();
+ List<FieldSchema> fields = op.getSchema().getFields();
+ int rows = (int) bag.size();
+ int[] maxColSizes = new int[cols];
+ for (int i = 0; i < cols; ++i) {
+ maxColSizes[i] = fields.get(i).toString().length();
+ if (maxColSizes[i] < 5)
+ maxColSizes[i] = 5;
+ }
+ int total = 0;
+ int aliasLength = op.getAlias().length() + 4;
+ for (int j = 0; j < cols; ++j) {
+ for (int i = 0; i < rows; ++i) {
+ int length = table[i][j].length();
+ if (length > maxColSizes[j])
+ maxColSizes[j] = length;
+ }
+ total += maxColSizes[j];
+ }
+
+ // Display the schema first
+ output
+ .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+ false)
+ + "\n");
+ output.append("| " + op.getAlias() + AddSpaces(4, true) + " | ");
+ for (int i = 0; i < cols; ++i) {
+ String field = fields.get(i).toString();
+ output.append(field
+ + AddSpaces(maxColSizes[i] - field.length(), true) + " | ");
+ }
+ output.append("\n"
+ + AddSpaces(total + 3 * (cols + 1) + aliasLength + 1, false)
+ + "\n");
+ // now start displaying the data
+ for (int i = 0; i < rows; ++i) {
+ output.append("| " + AddSpaces(aliasLength, true) + " | ");
+ for (int j = 0; j < cols; ++j) {
+ String str = table[i][j];
+ output.append(str
+ + AddSpaces(maxColSizes[j] - str.length(), true)
+ + " | ");
+ }
+ output.append("\n");
+ }
+ // now display the finish line
+ output
+ .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+ false)
+ + "\n");
+ }
+
+ static String[][] MakeArray(LogicalOperator op, DataBag bag)
+ throws Exception {
+ int rows = (int) bag.size();
+ int cols = op.getSchema().getFields().size();
+ String[][] table = new String[rows][cols];
+ Iterator<Tuple> it = bag.iterator();
+ for (int i = 0; i < rows; ++i) {
+ Tuple t = it.next();
+ for (int j = 0; j < cols; ++j) {
+ table[i][j] = ShortenField(t.get(j));
+ }
+ }
+ return table;
+ }
+
+ static String ShortenField(Object d) throws ExecException {
+ if (d instanceof Tuple)
+ return ShortenField((Tuple) d);
+ else if (d instanceof DataBag)
+ return ShortenField((DataBag) d);
+ else {
+ // System.out.println("Unrecognized data-type received!!!");
+ // return null;
+ if (DataType.findTypeName(d) != null)
+ return d.toString();
+ }
+ System.out.println("Unrecognized data-type received!!!");
+ return null;
+ }
+
+ static String ShortenField(DataBag bag) throws ExecException {
+ StringBuffer str = new StringBuffer();
+ long size = bag.size();
+ str.append("{");
+ if (size > 3) {
+ Iterator<Tuple> it = bag.iterator();
+ str.append(ShortenField(it.next()));
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ if (!it.hasNext()) {
+ str.append(", ..., " + ShortenField(t));
+ }
+ }
+ } else {
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ if (it.hasNext()) {
+ str.append(ShortenField(t) + ", ");
+ } else
+ str.append(ShortenField(t));
+ }
+ }
+ str.append("}");
+ return str.toString();
+ }
+
+ static String ShortenField(Tuple t) throws ExecException {
+ StringBuffer str = new StringBuffer();
+ int noFields = t.size();
+ str.append("(");
+ if (noFields > 3) {
+
+ Object d = t.get(0);
+ str.append(ShortenField(d) + ", ..., ");
+ d = t.get(noFields - 1);
+
+ str.append(ShortenField(d));
+
+ } else {
+ for (int i = 0; i < noFields; ++i) {
+ Object d = t.get(i);
+
+ if (i != (noFields - 1)) {
+ str.append(ShortenField(d) + ", ");
+ } else {
+ str.append(ShortenField(d));
+ }
+
+ }
+ }
+ str.append(")");
+ return str.toString();
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+//Example tuple adds 2 booleans to Tuple
+//synthetic say whether the tuple was generated synthetically
+//omittable is for future use in case we want to attach weights to tuples that have been displayed earlier
+public class ExampleTuple implements Tuple {
+ private static final long serialVersionUID = 2L;
+
+ public boolean synthetic = false;
+ public boolean omittable = true;
+ Tuple t;
+
+ public ExampleTuple() {
+
+ }
+
+ public ExampleTuple(Tuple t) {
+ // Have to do it like this because Tuple is an interface, we don't
+ // have access to its internal structures.
+ this.t = t;
+
+ }
+
+ @Override
+ public String toString() {
+ return t.toString();
+ }
+
+ // Writable methods:
+ public void write(DataOutput out) throws IOException {
+ t.write(out);
+ out.writeBoolean(synthetic);
+ out.writeBoolean(omittable);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ t.readFields(in);
+ this.synthetic = in.readBoolean();
+ this.omittable = in.readBoolean();
+ }
+
+ public Tuple toTuple() {
+ return t;
+ }
+
+ public void append(Object val) {
+ t.append(val);
+
+ }
+
+ public Object get(int fieldNum) throws ExecException {
+ return t.get(fieldNum);
+ }
+
+ public List<Object> getAll() {
+ return t.getAll();
+ }
+
+ public long getMemorySize() {
+ return t.getMemorySize();
+ }
+
+ public byte getType(int fieldNum) throws ExecException {
+ return t.getType(fieldNum);
+ }
+
+ public boolean isNull(int fieldNum) throws ExecException {
+ return t.isNull(fieldNum);
+ }
+
+ public boolean isNull() {
+ return t.isNull();
+ }
+
+ public void reference(Tuple t) {
+ t.reference(t);
+ }
+
+ public void set(int fieldNum, Object val) throws ExecException {
+ t.set(fieldNum, val);
+ }
+
+ public void setNull(boolean isNull) {
+ t.setNull(isNull);
+ }
+
+ public int size() {
+ return t.size();
+ }
+
+ public String toDelimitedString(String delim) throws ExecException {
+ return t.toDelimitedString(delim);
+ }
+
+ public int compareTo(Object o) {
+ return t.compareTo(o);
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.optimizer.ImplicitSplitInserter;
+import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.StreamOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.TypeCastInserter;
+import org.apache.pig.impl.plan.optimizer.PlanOptimizer;
+import org.apache.pig.impl.plan.optimizer.Rule;
+
+//This optimiser puts in the bare minimum modifications needed to make sure the plan is functional
+public class FunctionalLogicalOptimizer extends
+ PlanOptimizer<LogicalOperator, LogicalPlan> {
+
+ public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
+ public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
+
+ public FunctionalLogicalOptimizer(LogicalPlan plan) {
+ super(plan);
+
+ // List of rules for the logical optimizer
+
+ // This one has to be first, as the type cast inserter expects the
+ // load to only have one output.
+ // Find any places in the plan that have an implicit split and make
+ // it explicit. Since the RuleMatcher doesn't handle trees properly,
+ // we cheat and say that we match any node. Then we'll do the actual
+ // test in the transformers check method.
+ List<String> nodes = new ArrayList<String>(1);
+ Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
+ List<Boolean> required = new ArrayList<Boolean>(1);
+ nodes.add("any");
+ required.add(true);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+ required, new ImplicitSplitInserter(plan)));
+
+ // Add type casting to plans where the schema has been declared (by
+ // user, data, or data catalog).
+ nodes = new ArrayList<String>(1);
+ nodes.add(LOLOAD_CLASSNAME);
+ edges = new HashMap<Integer, Integer>();
+ required = new ArrayList<Boolean>(1);
+ required.add(true);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+ required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+
+ // Add type casting to plans where the schema has been declared by
+ // user in a statement with stream operator.
+ nodes = new ArrayList<String>(1);
+ nodes.add(LOSTREAM_CLASSNAME);
+ edges = new HashMap<Integer, Integer>();
+ required = new ArrayList<Boolean>(1);
+ required.add(true);
+ mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
+ LOSTREAM_CLASSNAME)));
+
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.util;
+
+import java.util.*;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+public class LineageTracer {
+
+ // Use textbook Union-Find data structure, with counts associated with items
+
+ // note: we test for equality by comparing tuple references, not by calling
+ // the "equals()" method
+ // the "IdentityHashMap" data structure is based on reference equality
+ IdentityHashMap<Tuple, Tuple> parents = new IdentityHashMap<Tuple, Tuple>();
+ IdentityHashMap<Tuple, Integer> counts = new IdentityHashMap<Tuple, Integer>(); // has
+ // one
+ // entry
+ // per
+ // unique
+ // tuple
+ // being
+ // tracked
+ IdentityHashMap<Tuple, Integer> ranks = new IdentityHashMap<Tuple, Integer>();
+
+ // insert a new tuple (if a tuple is inserted multiple times, it gets a
+ // count > 1)
+ public void insert(Tuple t) {
+ if (parents.containsKey(t)) {
+ counts.put(t, counts.get(t) + 1);
+ } else {
+ parents.put(t, t);
+ counts.put(t, 1);
+ ranks.put(t, 0);
+ }
+ }
+
+ // union two tuple sets
+ public void union(Tuple t1, Tuple t2) {
+ link(getRepresentative(t1), getRepresentative(t2));
+ }
+
+ // find the set representative of a given tuple
+ public Tuple getRepresentative(Tuple t) {
+ Tuple tParent = parents.get(t);
+ if (tParent != t) {
+ tParent = getRepresentative(tParent);
+ parents.put(t, tParent);
+ }
+ return tParent;
+ }
+
+ private void link(Tuple t1, Tuple t2) {
+ int t1Rank = ranks.get(t1);
+ int t2Rank = ranks.get(t2);
+ if (t1Rank > t2Rank) {
+ parents.put(t2, t1);
+ } else {
+ parents.put(t1, t2);
+ if (t1Rank == t2Rank)
+ ranks.put(t2, t2Rank + 1);
+ }
+ }
+
+ // get the cardinality of each tuple set (identified by a representative
+ // tuple)
+ public IdentityHashMap<Tuple, Double> getCounts() {
+ return getWeightedCounts(2f, 1f);
+ }
+
+ // get the cardinality of each tuple set, weighted in a special way
+ // weighting works like this: if a tuple set contains one or more tuples
+ // from the "specialTuples" set, we multiply its value by "multiplier"
+ // public IdentityHashMap<Tuple, Integer>
+ // getWeightedCounts(IdentityHashSet<Tuple> specialTuples, int multiplier) {
+ // IdentityHashMap<Tuple, Integer> repCounts = new IdentityHashMap<Tuple,
+ // Integer>();
+ // IdentityHashSet<Tuple> specialSets = new IdentityHashSet<Tuple>();
+ //
+ // for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+ // Tuple t = e.getKey();
+ //
+ // int newCount = counts.get(t);
+ // Tuple rep = getRepresentative(t);
+ // int oldCount = (repCounts.containsKey(rep))? repCounts.get(rep) : 0;
+ // repCounts.put(rep, oldCount + newCount);
+ // if (specialTuples.contains(t)) specialSets.add(rep);
+ // }
+ //
+ // for (IdentityHashMap.Entry<Tuple, Integer> e : repCounts.entrySet()) {
+ // if (specialSets.contains(e.getKey())) e.setValue(e.getValue() *
+ // multiplier);
+ // }
+ //
+ // return repCounts;
+ // }
+
+ public IdentityHashMap<Tuple, Double> getWeightedCounts(
+ float syntheticMultipler, float omittableMultiplier) {
+ IdentityHashMap<Tuple, Double> repCounts = new IdentityHashMap<Tuple, Double>();
+
+ for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+ Tuple t = e.getKey();
+
+ float newCount = counts.get(t);
+ if (((ExampleTuple) t).synthetic)
+ newCount = newCount * syntheticMultipler;
+ if (((ExampleTuple) t).omittable)
+ newCount = newCount * omittableMultiplier;
+
+ Tuple rep = getRepresentative(t);
+ double oldCount = (repCounts.containsKey(rep)) ? repCounts.get(rep)
+ : 0;
+ repCounts.put(rep, oldCount + newCount);
+ // if (specialTuples.contains(t)) specialSets.add(rep);
+ }
+ /*
+ * for (IdentityHashMap.Entry<Tuple, Integer> e : repCounts.entrySet())
+ * { if (specialSets.contains(e.getKey())) e.setValue(e.getValue()
+ * multiplier); }
+ */
+ return repCounts;
+ }
+
+ // get all members of the set containing t
+ public Collection<Tuple> getMembers(Tuple t) {
+ Tuple representative = getRepresentative(t);
+
+ Collection<Tuple> members = new LinkedList<Tuple>();
+ for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+ Tuple t1 = e.getKey();
+ if (getRepresentative(t1) == representative)
+ members.add(t1);
+ }
+ return members;
+ }
+
+ // get a mapping from set representatives to members
+ public IdentityHashMap<Tuple, Collection<Tuple>> getMembershipMap() {
+ IdentityHashMap<Tuple, Collection<Tuple>> map = new IdentityHashMap<Tuple, Collection<Tuple>>();
+ for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+ Tuple t = e.getKey();
+
+ Tuple representative = getRepresentative(t);
+ Collection<Tuple> members = map.get(representative);
+ if (members == null) {
+ members = new LinkedList<Tuple>();
+ map.put(representative, members);
+ }
+ members.add(t);
+ }
+ return map;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+//Evaluates various metrics
+public class MetricEvaluation {
+ public static float getRealness(LogicalOperator op,
+ Map<LogicalOperator, DataBag> exampleData, boolean overallRealness) {
+ // StringBuffer str = new StringBuffer();
+ int noTuples = 0;
+ int noSynthetic = 0;
+ for (Map.Entry<LogicalOperator, DataBag> e : exampleData.entrySet()) {
+ // if(e.getKey() instanceof LORead) continue;
+ if (e.getKey().getAlias() == null)
+ continue;
+ DataBag bag;
+ if (overallRealness) {
+ bag = exampleData.get(e.getKey());
+ } else {
+ bag = exampleData.get(op);
+ }
+ noTuples += bag.size();
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ if (((ExampleTuple) it.next()).synthetic)
+ noSynthetic++;
+ }
+ if (!overallRealness)
+ break;
+
+ }
+
+ if (noTuples == 0) {
+ if (noSynthetic == 0)
+ return 0.0f;
+ else
+ return 100.0f;
+ }
+ return 100 * (1 - ((float) noSynthetic / (float) noTuples));
+
+ }
+
+ public static float getConciseness(
+ LogicalOperator op,
+ Map<LogicalOperator, DataBag> exampleData,
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses,
+ boolean overallConciseness) {
+ DataBag bag = exampleData.get(op);
+
+ int noEqCl = OperatorToEqClasses.get(op).size();
+ long noTuples = bag.size();
+
+ float conciseness = 100 * ((float) noEqCl / (float) noTuples);
+ if (!overallConciseness) {
+
+ return ((conciseness > 100.0) ? 100.0f : conciseness);
+ } else {
+
+ noEqCl = 0;
+ noTuples = 0;
+ conciseness = 0;
+ int noOperators = 0;
+
+ for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses
+ .entrySet()) {
+ if (e.getKey().getAlias() == null)
+ continue;
+ noOperators++; // we need to keep a track of these and not use
+ // OperatorToEqClasses.size() as LORead shouldn't
+ // be considered a operator
+ bag = exampleData.get(e.getKey());
+
+ noTuples = bag.size();
+ noEqCl = e.getValue().size();
+ float concise = 100 * ((float) noEqCl / (float) noTuples);
+ concise = (concise > 100) ? 100 : concise;
+ conciseness += concise;
+ }
+ conciseness /= (float) noOperators;
+
+ return conciseness;
+ }
+
+ }
+
+ public static float getCompleteness(
+ LogicalOperator op,
+ Map<LogicalOperator, DataBag> exampleData,
+ Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses,
+ boolean overallCompleteness) {
+
+ int noClasses = 0;
+ int noCoveredClasses = 0;
+ int noOperators = 0;
+ Map<Integer, Boolean> coveredClasses;
+ float completeness = 0;
+ if (!overallCompleteness) {
+ Collection<IdentityHashSet<Tuple>> eqClasses = OperatorToEqClasses
+ .get(op);
+ DataBag bag;
+
+ if (op instanceof LOFilter)
+ bag = exampleData.get(((LOFilter) op).getInput());
+ else
+ bag = exampleData.get(op);
+ coveredClasses = getCompletenessLogic(bag, eqClasses);
+ noClasses = eqClasses.size();
+ for (Map.Entry<Integer, Boolean> e : coveredClasses.entrySet()) {
+ if (e.getValue()) {
+ noCoveredClasses++;
+ }
+ }
+
+ return 100 * ((float) noCoveredClasses) / (float) noClasses;
+ } else {
+ for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses
+ .entrySet()) {
+ noCoveredClasses = 0;
+ noClasses = 0;
+
+ // if(e.getKey() instanceof LORead) continue; //We don't
+ // consider LORead a operator.
+ if (e.getKey().getAlias() == null)
+ continue; // we want to consider join a single operator
+ noOperators++;
+ Collection<IdentityHashSet<Tuple>> eqClasses = e.getValue();
+ LogicalOperator lop = e.getKey();
+ DataBag bag;
+ if (lop instanceof LOFilter)
+ bag = exampleData.get(((LOFilter) lop).getInput());
+ else
+ bag = exampleData.get(lop);
+ coveredClasses = getCompletenessLogic(bag, eqClasses);
+ noClasses += eqClasses.size();
+ for (Map.Entry<Integer, Boolean> e_result : coveredClasses
+ .entrySet()) {
+ if (e_result.getValue()) {
+ noCoveredClasses++;
+ }
+ }
+ completeness += 100 * ((float) noCoveredClasses / (float) noClasses);
+ }
+ completeness /= (float) noOperators;
+
+ return completeness;
+ }
+
+ }
+
+ private static Map<Integer, Boolean> getCompletenessLogic(DataBag bag,
+ Collection<IdentityHashSet<Tuple>> eqClasses) {
+ Map<Integer, Boolean> coveredClasses = new HashMap<Integer, Boolean>();
+
+ for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ int classId = 0;
+ for (IdentityHashSet<Tuple> eqClass : eqClasses) {
+
+ if (eqClass.contains(t) || eqClass.size() == 0) {
+ coveredClasses.put(classId, true);
+ }
+ classId++;
+ }
+ }
+
+ return coveredClasses;
+
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class PreOrderDepthFirstWalker<O extends Operator, P extends OperatorPlan<O>>
+ extends PlanWalker<O, P> {
+ /**
+ * @param plan
+ * Plan for this walker to traverse.
+ */
+ public PreOrderDepthFirstWalker(P plan) {
+ super(plan);
+ }
+
+ /**
+ * Begin traversing the graph.
+ *
+ * @param visitor
+ * Visitor this walker is being used by.
+ * @throws VisitorException
+ * if an error is encountered while walking.
+ */
+ public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+ List<O> leaves = mPlan.getLeaves();
+ Set<O> seen = new HashSet<O>();
+
+ depthFirst(null, leaves, seen, visitor);
+ }
+
+ public PlanWalker<O, P> spawnChildWalker(P plan) {
+ return new DepthFirstWalker<O, P>(plan);
+ }
+
+ private void depthFirst(O node, Collection<O> predecessors, Set<O> seen,
+ PlanVisitor<O, P> visitor) throws VisitorException {
+ if (predecessors == null)
+ return;
+
+ for (O pred : predecessors) {
+ if (seen.add(pred)) {
+ pred.visit(visitor);
+ Collection<O> newPredecessors = mPlan.getPredecessors(pred);
+ depthFirst(pred, newPredecessors, seen, visitor);
+ }
+ }
+ }
+}