You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/26 23:23:35 UTC

svn commit: r699505 [2/2] - in /incubator/pig/branches/types/src/org/apache/pig/pen: ./ util/

Added: incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/LineageTrimmingVisitor.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LODistinct;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.util.MetricEvaluation;
+import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
+
+public class LineageTrimmingVisitor extends LOVisitor {
+
+    LogicalPlan plan = null;
+    Map<LOLoad, DataBag> baseData = new HashMap<LOLoad, DataBag>();
+    Map<LogicalOperator, PhysicalOperator> LogToPhyMap = null;
+    PhysicalPlan physPlan = null;
+    double completeness = 100.0;
+    Log log = LogFactory.getLog(getClass());
+
+    Map<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>> AffinityGroups = new HashMap<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>>();
+    Map<LogicalOperator, LineageTracer> Lineage = new HashMap<LogicalOperator, LineageTracer>();
+
+    boolean continueTrimming;
+    PigContext pc;
+
+    public LineageTrimmingVisitor(LogicalPlan plan,
+            Map<LOLoad, DataBag> baseData,
+            Map<LogicalOperator, PhysicalOperator> LogToPhyMap,
+            PhysicalPlan physPlan, PigContext pc) {
+        super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
+                plan));
+        // this.baseData.putAll(baseData);
+        this.baseData = baseData;
+        this.plan = plan;
+        this.LogToPhyMap = LogToPhyMap;
+        this.pc = pc;
+        this.physPlan = physPlan;
+        init();
+    }
+
+    public void init() {
+
+        DerivedDataVisitor visitor = new DerivedDataVisitor(plan, pc, baseData,
+                LogToPhyMap, physPlan);
+        try {
+            visitor.visit();
+        } catch (VisitorException e) {
+            log.error(e.getMessage());
+        }
+
+        LineageTracer lineage = visitor.lineage;
+        Lineage.put(plan.getLeaves().get(0), lineage);
+        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = visitor.OpToEqClasses;
+        Collection<IdentityHashSet<Tuple>> EqClasses = visitor.EqClasses;
+        Map<IdentityHashSet<Tuple>, Integer> affinityGroup = new HashMap<IdentityHashSet<Tuple>, Integer>();
+        for (IdentityHashSet<Tuple> set : EqClasses) {
+            affinityGroup.put(set, 1);
+        }
+        AffinityGroups.put(plan.getLeaves().get(0), affinityGroup);
+        completeness = MetricEvaluation.getCompleteness(null,
+                visitor.derivedData, OpToEqClasses, true);
+        LogToPhyMap = visitor.LogToPhyMap;
+        continueTrimming = true;
+
+    }
+
+    @Override
+    protected void visit(LOCogroup cg) throws VisitorException {
+        if (continueTrimming) {
+            Map<IdentityHashSet<Tuple>, Integer> affinityGroups = null;
+
+            continueTrimming = checkCompleteness(cg);
+            
+            DerivedDataVisitor visitor = null;
+            LineageTracer lineage = null;
+            // create affinity groups
+            if (cg.getInputs().size() == 1) {
+                affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+                LogicalOperator childOp = cg.getInputs().get(0);
+                visitor = new DerivedDataVisitor(childOp, null, baseData,
+                        LogToPhyMap, physPlan);
+                try {
+                    visitor.visit();
+                } catch (VisitorException e) {
+                    log.error(e.getMessage());
+                }
+
+                lineage = visitor.lineage;
+
+                DataBag bag = visitor.evaluateIsolatedOperator(cg);
+                for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+                    DataBag field;
+                    try {
+                        field = (DataBag) it.next().get(1);
+                    } catch (ExecException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                        log.error(e.getMessage());
+                        throw new VisitorException(
+                                "Error trimming operator COGROUP operator "
+                                        + cg.getAlias()
+                                        + "in example generator");
+                    }
+                    IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
+                    affinityGroups.put(set, 2);
+                    for (Iterator<Tuple> it1 = field.iterator(); it1.hasNext();) {
+                        set.add(it1.next());
+                    }
+                }
+
+                // add the equivalence classes obtained from derived data
+                // creation
+                for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
+                    affinityGroups.put(set, 1);
+                }
+                AffinityGroups.put(cg.getInputs().get(0), affinityGroups);
+                Lineage.put(cg.getInputs().get(0), lineage);
+
+            } else {
+                List<DataBag> inputs = new LinkedList<DataBag>();
+                visitor = new DerivedDataVisitor(cg, null, baseData,
+                        LogToPhyMap, physPlan);
+                affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+                for (int i = 0; i < cg.getInputs().size(); i++) {
+                    // affinityGroups = new HashMap<IdentityHashSet<Tuple>,
+                    // Integer>();
+                    LogicalOperator childOp = cg.getInputs().get(i);
+                    // visitor = new DerivedDataVisitor(cg.getInputs().get(i),
+                    // null, baseData, LogToPhyMap, physPlan);
+                    visitor.setOperatorToEvaluate(childOp);
+                    try {
+                        visitor.visit();
+                    } catch (VisitorException e) {
+                        log.error(e.getMessage());
+                    }
+                    // Lineage.put(childOp, visitor.lineage);
+                    inputs.add(visitor.derivedData.get(childOp));
+
+                    for (IdentityHashSet<Tuple> set : visitor.EqClasses)
+                        affinityGroups.put(set, 1);
+
+                    // AffinityGroups.put(cg.getInputs().get(i),
+                    // affinityGroups);
+                }
+                for (LogicalOperator input : cg.getInputs()) {
+                    Lineage.put(input, visitor.lineage);
+                    AffinityGroups.put(input, affinityGroups);
+                }
+
+                visitor = new DerivedDataVisitor(cg, null, baseData,
+                        LogToPhyMap, physPlan);
+                DataBag output = visitor.evaluateIsolatedOperator(cg, inputs);
+
+                for (int i = 1; i <= cg.getInputs().size(); i++) {
+                    affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+                    for (Iterator<Tuple> it = output.iterator(); it.hasNext();) {
+                        DataBag bag = null;
+                        try {
+                            bag = (DataBag) it.next().get(i);
+                        } catch (ExecException e) {
+                            // TODO Auto-generated catch block
+                            log.error(e.getMessage());
+                        }
+                        IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
+                        affinityGroups.put(set, 1);
+                        for (Iterator<Tuple> it1 = bag.iterator(); it1
+                                .hasNext();) {
+                            set.add(it1.next());
+                        }
+                    }
+                    AffinityGroups.get(cg.getInputs().get(i - 1)).putAll(
+                            affinityGroups);
+
+                }
+                AffinityGroups = AffinityGroups;
+            }
+        }
+    }
+
+    @Override
+    protected void visit(LOCross cs) throws VisitorException {
+        if(continueTrimming)
+            processOperator(cs);
+
+    }
+
+    @Override
+    protected void visit(LODistinct dt) throws VisitorException {
+        if(continueTrimming)
+            processOperator(dt);
+
+    }
+
+    @Override
+    protected void visit(LOFilter filter) throws VisitorException {
+        if (continueTrimming)
+            processOperator(filter);
+    }
+
+    @Override
+    protected void visit(LOForEach forEach) throws VisitorException {
+        if (continueTrimming)
+            processOperator(forEach);
+    }
+
+    @Override
+    protected void visit(LOLimit limOp) throws VisitorException {
+        if(continueTrimming)
+            processOperator(limOp);
+
+    }
+
+    @Override
+    protected void visit(LOLoad load) throws VisitorException {
+        if (continueTrimming)
+            processOperator(load);
+    }
+
+    @Override
+    protected void visit(LOSort s) throws VisitorException {
+        if(continueTrimming)
+            processOperator(s);
+
+    }
+
+    @Override
+    protected void visit(LOSplit split) throws VisitorException {
+        if(continueTrimming)
+            processOperator(split);
+
+    }
+
+    @Override
+    protected void visit(LOUnion u) throws VisitorException {
+        if(continueTrimming)
+            processOperator(u);
+
+    }
+
+    private Map<LOLoad, DataBag> PruneBaseDataConstrainedCoverage(
+            Map<LOLoad, DataBag> baseData, DataBag rootOutput,
+            LineageTracer lineage,
+            Map<IdentityHashSet<Tuple>, Integer> equivalenceClasses) {
+
+        IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage
+                .getMembershipMap();
+        IdentityHashMap<Tuple, Double> lineageGroupWeights = lineage
+                .getWeightedCounts(2f, 1);
+
+        // compute a mapping from lineage group to the set of equivalence
+        // classes covered by it
+        // IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new
+        // IdentityHashMap<Tuple, Set<Integer>>();
+        IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>>();
+        int equivClassId = 0;
+        for (IdentityHashSet<Tuple> equivClass : equivalenceClasses.keySet()) {
+            for (Tuple t : equivClass) {
+                Tuple lineageGroup = lineage.getRepresentative(t);
+                // Set<Integer> entry =
+                // lineageGroupToEquivClasses.get(lineageGroup);
+                Set<IdentityHashSet<Tuple>> entry = lineageGroupToEquivClasses
+                        .get(lineageGroup);
+                if (entry == null) {
+                    // entry = new HashSet<Integer>();
+                    entry = new HashSet<IdentityHashSet<Tuple>>();
+                    lineageGroupToEquivClasses.put(lineageGroup, entry);
+                }
+                // entry.add(equivClassId);
+                entry.add(equivClass);
+            }
+
+            equivClassId++;
+        }
+
+        // select lineage groups such that we cover all equivalence classes
+        IdentityHashSet<Tuple> selectedLineageGroups = new IdentityHashSet<Tuple>();
+        while (!lineageGroupToEquivClasses.isEmpty()) {
+            // greedily find the lineage group with the best "score", where
+            // score = # equiv classes covered / group weight
+            double bestScore = -1;
+            Tuple bestLineageGroup = null;
+            Set<IdentityHashSet<Tuple>> bestEquivClassesCovered = null;
+            for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
+                double weight = lineageGroupWeights.get(lineageGroup);
+
+                Set<IdentityHashSet<Tuple>> equivClassesCovered = lineageGroupToEquivClasses
+                        .get(lineageGroup);
+                int numEquivClassesCovered = equivClassesCovered.size();
+                double score = ((double) numEquivClassesCovered)
+                        / ((double) weight);
+
+                if (score > bestScore) {
+
+                    if (selectedLineageGroups.contains(lineageGroup)) {
+                        bestLineageGroup = lineageGroup;
+                        bestEquivClassesCovered = equivClassesCovered;
+                        continue;
+                    }
+
+                    bestScore = score;
+                    bestLineageGroup = lineageGroup;
+                    bestEquivClassesCovered = equivClassesCovered;
+                }
+            }
+            // add the best-scoring lineage group to the set of ones we plan to
+            // retain
+            selectedLineageGroups.add(bestLineageGroup);
+
+            // make copy of bestEquivClassesCovered (or else the code that
+            // follows won't work correctly, because removing from the reference
+            // set)
+            Set<IdentityHashSet<Tuple>> toCopy = bestEquivClassesCovered;
+            bestEquivClassesCovered = new HashSet<IdentityHashSet<Tuple>>();
+            bestEquivClassesCovered.addAll(toCopy);
+
+            // remove the classes we've now covered
+            Collection<Tuple> toRemove = new LinkedList<Tuple>();
+            for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
+
+                Set<IdentityHashSet<Tuple>> equivClasses = lineageGroupToEquivClasses
+                        .get(lineageGroup);
+
+                for (Iterator<IdentityHashSet<Tuple>> it = equivClasses
+                        .iterator(); it.hasNext();) {
+                    IdentityHashSet<Tuple> equivClass = it.next();
+                    if (bestEquivClassesCovered.contains(equivClass)) {
+                        if ((equivalenceClasses.get(equivClass) - 1) <= 0) {
+                            // equivClasses.remove(equivClass);
+                            it.remove();
+                        }
+                    }
+                }
+                if (equivClasses.size() == 0)
+                    toRemove.add(lineageGroup);
+
+            }
+            for (Tuple removeMe : toRemove)
+                lineageGroupToEquivClasses.remove(removeMe);
+
+            for (IdentityHashSet<Tuple> equivClass : bestEquivClassesCovered) {
+                equivalenceClasses.put(equivClass, equivalenceClasses
+                        .get(equivClass) - 1);
+            }
+        }
+
+        // revise baseData to only contain the tuples that are part of
+        // selectedLineageGroups
+        IdentityHashSet<Tuple> tuplesToRetain = new IdentityHashSet<Tuple>();
+        for (Tuple lineageGroup : selectedLineageGroups) {
+            Collection<Tuple> members = membershipMap.get(lineageGroup);
+            for (Tuple t : members)
+                tuplesToRetain.add(t);
+        }
+        Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
+        for (LOLoad loadOp : baseData.keySet()) {
+            DataBag data = baseData.get(loadOp);
+            // DataBag newData = new DataBag();
+            DataBag newData = BagFactory.getInstance().newDefaultBag();
+            for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                if (tuplesToRetain.contains(t))
+                    newData.add(t);
+            }
+            newBaseData.put(loadOp, newData);
+        }
+
+        return newBaseData;
+    }
+
+    private void processOperator(LogicalOperator op) {
+        continueTrimming = checkCompleteness(op);
+
+        if (op instanceof LOLoad || continueTrimming == false)
+            return;
+
+        LogicalOperator childOp = plan.getPredecessors(op).get(0);
+
+        DerivedDataVisitor visitor = new DerivedDataVisitor(childOp, null,
+                baseData, LogToPhyMap, physPlan);
+        try {
+            visitor.visit();
+        } catch (VisitorException e) {
+            log.error(e.getMessage());
+        }
+
+        DataBag bag = visitor.derivedData.get(childOp);
+        Map<IdentityHashSet<Tuple>, Integer> affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
+
+        for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+            IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
+            affinityGroups.put(set, 1);
+            set.add(it.next());
+        }
+
+        for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
+            // newEquivalenceClasses.put(set, 1);
+            affinityGroups.put(set, 1);
+        }
+
+        AffinityGroups.put(childOp, affinityGroups);
+        Lineage.put(childOp, visitor.lineage);
+
+    }
+
+    private boolean checkCompleteness(LogicalOperator op) {
+        LineageTracer lineage = Lineage.get(op);
+        Lineage.remove(op);
+
+        Map<IdentityHashSet<Tuple>, Integer> affinityGroups = AffinityGroups
+                .get(op);
+        AffinityGroups.remove(op);
+
+        Map<LOLoad, DataBag> newBaseData = PruneBaseDataConstrainedCoverage(
+                baseData, null, lineage, affinityGroups);
+
+        // obtain the derived data
+        DerivedDataVisitor visitor = new DerivedDataVisitor(plan, null,
+                newBaseData, LogToPhyMap, physPlan);
+        try {
+            visitor.visit();
+        } catch (VisitorException e) {
+            log.error(e.getMessage());
+        }
+
+        double newCompleteness = MetricEvaluation.getCompleteness(null,
+                visitor.derivedData, visitor.OpToEqClasses, true);
+
+        if (newCompleteness >= completeness) {
+            completeness = newCompleteness;
+            baseData.putAll(newBaseData);
+        } else {
+            continueTrimming = false;
+        }
+
+        return continueTrimming;
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+//This modifies the dependencyOrderWalker by limiting the walking to the predecessors of a given operator
+public class DependencyOrderLimitedWalker<O extends Operator, P extends OperatorPlan<O>>
+        extends DependencyOrderWalker<O, P> {
+
+    private O operator;
+
+    public DependencyOrderLimitedWalker(O operator, P plan) {
+        super(plan);
+        this.operator = operator;
+    }
+
+    @Override
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        List<O> fifo = new ArrayList<O>();
+
+        Set<O> seen = new HashSet<O>();
+        doAllPredecessors(operator, seen, fifo);
+
+        for (O op : fifo) {
+            op.visit(visitor);
+        }
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/DisplayExamples.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+//Class containing some generic printing methods to print example data in a simple/tabular form
+public class DisplayExamples {
+
+    public static StringBuffer Result = new StringBuffer();
+    public static final int MAX_DATAATOM_LENGTH = 25;
+
+    static void PrintMetrics(
+            LogicalOperator op,
+            Map<LogicalOperator, DataBag> derivedData,
+            Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses) {
+        /*
+         * System.out.println("Realness : " + Metrics.getRealness(op,
+         * derivedData, true)); System.out.println("Completeness : " +
+         * Metrics.getCompleteness(op, derivedData, OperatorToEqClasses, true));
+         * System.out.println("Consiseness : " + Metrics.getConciseness(op,
+         * derivedData, OperatorToEqClasses, true));
+         */
+        System.out.println("Realness : "
+                + MetricEvaluation.getRealness(op, derivedData, true)
+                + "\n"
+                + "Conciseness : "
+                + MetricEvaluation.getConciseness(op, derivedData,
+                        OperatorToEqClasses, true)
+                + "\n"
+                + "Completeness : "
+                + MetricEvaluation.getCompleteness(op, derivedData,
+                        OperatorToEqClasses, true) + "\n");
+    }
+
+    public static String PrintTabular(LogicalPlan lp,
+            Map<LogicalOperator, DataBag> exampleData) {
+        StringBuffer output = new StringBuffer();
+
+        LogicalOperator currentOp = lp.getLeaves().get(0);
+        PrintTabular(currentOp, exampleData, output);
+        return output.toString();
+    }
+
+    static void PrintTabular(LogicalOperator op,
+            Map<LogicalOperator, DataBag> exampleData, StringBuffer output) {
+        DataBag bag = exampleData.get(op);
+
+        List<LogicalOperator> inputs = op.getPlan().getPredecessors(op);
+        if (inputs != null) { // to avoid an exception when op == LOLoad
+            for (LogicalOperator Op : inputs) {
+                PrintTabular(Op, exampleData, output);
+            }
+        }
+        if (op.getAlias() != null) {
+            // PrintTable(op, bag, output);
+            try {
+                DisplayTable(MakeArray(op, bag), op, bag, output);
+            } catch (FrontendException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            } catch (Exception e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    public static void PrintSimple(LogicalOperator op,
+            Map<LogicalOperator, DataBag> exampleData) {
+        DataBag bag = exampleData.get(op);
+
+        List<LogicalOperator> inputs = op.getPlan().getPredecessors(op);
+        if (inputs != null) {
+            for (LogicalOperator lOp : inputs) {
+                PrintSimple(lOp, exampleData);
+            }
+        }
+        if (op.getAlias() != null) {
+            // PrintTable(op, bag, output);
+            // DisplayTable(MakeArray(op, bag), op, bag, output);
+            System.out.println(op.getAlias() + " : " + bag);
+        }
+        // System.out.println(op.getAlias() + " : " + bag);
+    }
+
+    static String AddSpaces(int n, boolean printSpace) {
+        StringBuffer str = new StringBuffer();
+        for (int i = 0; i < n; ++i) {
+            if (printSpace)
+                str.append(" ");
+            else
+                str.append("-");
+        }
+        return str.toString();
+    }
+
+    static void DisplayTable(String[][] table, LogicalOperator op, DataBag bag,
+            StringBuffer output) throws FrontendException {
+        int cols = op.getSchema().getFields().size();
+        List<FieldSchema> fields = op.getSchema().getFields();
+        int rows = (int) bag.size();
+        int[] maxColSizes = new int[cols];
+        for (int i = 0; i < cols; ++i) {
+            maxColSizes[i] = fields.get(i).toString().length();
+            if (maxColSizes[i] < 5)
+                maxColSizes[i] = 5;
+        }
+        int total = 0;
+        int aliasLength = op.getAlias().length() + 4;
+        for (int j = 0; j < cols; ++j) {
+            for (int i = 0; i < rows; ++i) {
+                int length = table[i][j].length();
+                if (length > maxColSizes[j])
+                    maxColSizes[j] = length;
+            }
+            total += maxColSizes[j];
+        }
+
+        // Display the schema first
+        output
+                .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+                        false)
+                        + "\n");
+        output.append("| " + op.getAlias() + AddSpaces(4, true) + " | ");
+        for (int i = 0; i < cols; ++i) {
+            String field = fields.get(i).toString();
+            output.append(field
+                    + AddSpaces(maxColSizes[i] - field.length(), true) + " | ");
+        }
+        output.append("\n"
+                + AddSpaces(total + 3 * (cols + 1) + aliasLength + 1, false)
+                + "\n");
+        // now start displaying the data
+        for (int i = 0; i < rows; ++i) {
+            output.append("| " + AddSpaces(aliasLength, true) + " | ");
+            for (int j = 0; j < cols; ++j) {
+                String str = table[i][j];
+                output.append(str
+                        + AddSpaces(maxColSizes[j] - str.length(), true)
+                        + " | ");
+            }
+            output.append("\n");
+        }
+        // now display the finish line
+        output
+                .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+                        false)
+                        + "\n");
+    }
+
+    static String[][] MakeArray(LogicalOperator op, DataBag bag)
+            throws Exception {
+        int rows = (int) bag.size();
+        int cols = op.getSchema().getFields().size();
+        String[][] table = new String[rows][cols];
+        Iterator<Tuple> it = bag.iterator();
+        for (int i = 0; i < rows; ++i) {
+            Tuple t = it.next();
+            for (int j = 0; j < cols; ++j) {
+                table[i][j] = ShortenField(t.get(j));
+            }
+        }
+        return table;
+    }
+
+    static String ShortenField(Object d) throws ExecException {
+        if (d instanceof Tuple)
+            return ShortenField((Tuple) d);
+        else if (d instanceof DataBag)
+            return ShortenField((DataBag) d);
+        else {
+            // System.out.println("Unrecognized data-type received!!!");
+            // return null;
+            if (DataType.findTypeName(d) != null)
+                return d.toString();
+        }
+        System.out.println("Unrecognized data-type received!!!");
+        return null;
+    }
+
+    static String ShortenField(DataBag bag) throws ExecException {
+        StringBuffer str = new StringBuffer();
+        long size = bag.size();
+        str.append("{");
+        if (size > 3) {
+            Iterator<Tuple> it = bag.iterator();
+            str.append(ShortenField(it.next()));
+            while (it.hasNext()) {
+                Tuple t = it.next();
+                if (!it.hasNext()) {
+                    str.append(", ..., " + ShortenField(t));
+                }
+            }
+        } else {
+            for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+                Tuple t = it.next();
+                if (it.hasNext()) {
+                    str.append(ShortenField(t) + ", ");
+                } else
+                    str.append(ShortenField(t));
+            }
+        }
+        str.append("}");
+        return str.toString();
+    }
+
+    static String ShortenField(Tuple t) throws ExecException {
+        StringBuffer str = new StringBuffer();
+        int noFields = t.size();
+        str.append("(");
+        if (noFields > 3) {
+
+            Object d = t.get(0);
+            str.append(ShortenField(d) + ", ..., ");
+            d = t.get(noFields - 1);
+
+            str.append(ShortenField(d));
+
+        } else {
+            for (int i = 0; i < noFields; ++i) {
+                Object d = t.get(i);
+
+                if (i != (noFields - 1)) {
+                    str.append(ShortenField(d) + ", ");
+                } else {
+                    str.append(ShortenField(d));
+                }
+
+            }
+        }
+        str.append(")");
+        return str.toString();
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/ExampleTuple.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+//Example tuple adds 2 booleans to Tuple
+//synthetic say whether the tuple was generated synthetically
+//omittable is for future use in case we want to attach weights to tuples that have been displayed earlier
+public class ExampleTuple implements Tuple {
+    private static final long serialVersionUID = 2L;
+
+    public boolean synthetic = false;
+    public boolean omittable = true;
+    Tuple t;
+
+    public ExampleTuple() {
+
+    }
+
+    public ExampleTuple(Tuple t) {
+        // Have to do it like this because Tuple is an interface, we don't
+        // have access to its internal structures.
+        this.t = t;
+
+    }
+
+    @Override
+    public String toString() {
+        return t.toString();
+    }
+
+    // Writable methods:
+    public void write(DataOutput out) throws IOException {
+        t.write(out);
+        out.writeBoolean(synthetic);
+        out.writeBoolean(omittable);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        t.readFields(in);
+        this.synthetic = in.readBoolean();
+        this.omittable = in.readBoolean();
+    }
+
+    public Tuple toTuple() {
+        return t;
+    }
+
+    public void append(Object val) {
+        t.append(val);
+
+    }
+
+    public Object get(int fieldNum) throws ExecException {
+        return t.get(fieldNum);
+    }
+
+    public List<Object> getAll() {
+        return t.getAll();
+    }
+
+    public long getMemorySize() {
+        return t.getMemorySize();
+    }
+
+    public byte getType(int fieldNum) throws ExecException {
+        return t.getType(fieldNum);
+    }
+
+    public boolean isNull(int fieldNum) throws ExecException {
+        return t.isNull(fieldNum);
+    }
+
+    public boolean isNull() {
+        return t.isNull();
+    }
+
+    public void reference(Tuple t) {
+        t.reference(t);
+    }
+
+    public void set(int fieldNum, Object val) throws ExecException {
+        t.set(fieldNum, val);
+    }
+
+    public void setNull(boolean isNull) {
+        t.setNull(isNull);
+    }
+
+    public int size() {
+        return t.size();
+    }
+
+    public String toDelimitedString(String delim) throws ExecException {
+        return t.toDelimitedString(delim);
+    }
+
+    public int compareTo(Object o) {
+        return t.compareTo(o);
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.optimizer.ImplicitSplitInserter;
+import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.StreamOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.TypeCastInserter;
+import org.apache.pig.impl.plan.optimizer.PlanOptimizer;
+import org.apache.pig.impl.plan.optimizer.Rule;
+
+//This optimiser puts in the bare minimum modifications needed to make sure the plan is functional
+public class FunctionalLogicalOptimizer extends
+        PlanOptimizer<LogicalOperator, LogicalPlan> {
+
+    public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
+    public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
+
+    public FunctionalLogicalOptimizer(LogicalPlan plan) {
+        super(plan);
+
+        // List of rules for the logical optimizer
+
+        // This one has to be first, as the type cast inserter expects the
+        // load to only have one output.
+        // Find any places in the plan that have an implicit split and make
+        // it explicit. Since the RuleMatcher doesn't handle trees properly,
+        // we cheat and say that we match any node. Then we'll do the actual
+        // test in the transformers check method.
+        List<String> nodes = new ArrayList<String>(1);
+        Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
+        List<Boolean> required = new ArrayList<Boolean>(1);
+        nodes.add("any");
+        required.add(true);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+                required, new ImplicitSplitInserter(plan)));
+
+        // Add type casting to plans where the schema has been declared (by
+        // user, data, or data catalog).
+        nodes = new ArrayList<String>(1);
+        nodes.add(LOLOAD_CLASSNAME);
+        edges = new HashMap<Integer, Integer>();
+        required = new ArrayList<Boolean>(1);
+        required.add(true);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+                required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+
+        // Add type casting to plans where the schema has been declared by
+        // user in a statement with stream operator.
+        nodes = new ArrayList<String>(1);
+        nodes.add(LOSTREAM_CLASSNAME);
+        edges = new HashMap<Integer, Integer>();
+        required = new ArrayList<Boolean>(1);
+        required.add(true);
+        mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
+                LOSTREAM_CLASSNAME)));
+
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/LineageTracer.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.util;
+
+import java.util.*;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+public class LineageTracer {
+
+    // Use textbook Union-Find data structure, with counts associated with items
+
+    // note: we test for equality by comparing tuple references, not by calling
+    // the "equals()" method
+    // the "IdentityHashMap" data structure is based on reference equality
+    IdentityHashMap<Tuple, Tuple> parents = new IdentityHashMap<Tuple, Tuple>();
+    IdentityHashMap<Tuple, Integer> counts = new IdentityHashMap<Tuple, Integer>(); // has
+    // one
+    // entry
+    // per
+    // unique
+    // tuple
+    // being
+    // tracked
+    IdentityHashMap<Tuple, Integer> ranks = new IdentityHashMap<Tuple, Integer>();
+
+    // insert a new tuple (if a tuple is inserted multiple times, it gets a
+    // count > 1)
+    public void insert(Tuple t) {
+        if (parents.containsKey(t)) {
+            counts.put(t, counts.get(t) + 1);
+        } else {
+            parents.put(t, t);
+            counts.put(t, 1);
+            ranks.put(t, 0);
+        }
+    }
+
+    // union two tuple sets
+    public void union(Tuple t1, Tuple t2) {
+        link(getRepresentative(t1), getRepresentative(t2));
+    }
+
+    // find the set representative of a given tuple
+    public Tuple getRepresentative(Tuple t) {
+        Tuple tParent = parents.get(t);
+        if (tParent != t) {
+            tParent = getRepresentative(tParent);
+            parents.put(t, tParent);
+        }
+        return tParent;
+    }
+
+    private void link(Tuple t1, Tuple t2) {
+        int t1Rank = ranks.get(t1);
+        int t2Rank = ranks.get(t2);
+        if (t1Rank > t2Rank) {
+            parents.put(t2, t1);
+        } else {
+            parents.put(t1, t2);
+            if (t1Rank == t2Rank)
+                ranks.put(t2, t2Rank + 1);
+        }
+    }
+
+    // get the cardinality of each tuple set (identified by a representative
+    // tuple)
+    public IdentityHashMap<Tuple, Double> getCounts() {
+        return getWeightedCounts(2f, 1f);
+    }
+
+    // get the cardinality of each tuple set, weighted in a special way
+    // weighting works like this: if a tuple set contains one or more tuples
+    // from the "specialTuples" set, we multiply its value by "multiplier"
+    // public IdentityHashMap<Tuple, Integer>
+    // getWeightedCounts(IdentityHashSet<Tuple> specialTuples, int multiplier) {
+    // IdentityHashMap<Tuple, Integer> repCounts = new IdentityHashMap<Tuple,
+    // Integer>();
+    // IdentityHashSet<Tuple> specialSets = new IdentityHashSet<Tuple>();
+    //        
+    // for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+    // Tuple t = e.getKey();
+    //
+    // int newCount = counts.get(t);
+    // Tuple rep = getRepresentative(t);
+    // int oldCount = (repCounts.containsKey(rep))? repCounts.get(rep) : 0;
+    // repCounts.put(rep, oldCount + newCount);
+    // if (specialTuples.contains(t)) specialSets.add(rep);
+    // }
+    //        
+    // for (IdentityHashMap.Entry<Tuple, Integer> e : repCounts.entrySet()) {
+    // if (specialSets.contains(e.getKey())) e.setValue(e.getValue() *
+    // multiplier);
+    // }
+    //
+    // return repCounts;
+    // }
+
+    public IdentityHashMap<Tuple, Double> getWeightedCounts(
+            float syntheticMultipler, float omittableMultiplier) {
+        IdentityHashMap<Tuple, Double> repCounts = new IdentityHashMap<Tuple, Double>();
+
+        for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+            Tuple t = e.getKey();
+
+            float newCount = counts.get(t);
+            if (((ExampleTuple) t).synthetic)
+                newCount = newCount * syntheticMultipler;
+            if (((ExampleTuple) t).omittable)
+                newCount = newCount * omittableMultiplier;
+
+            Tuple rep = getRepresentative(t);
+            double oldCount = (repCounts.containsKey(rep)) ? repCounts.get(rep)
+                    : 0;
+            repCounts.put(rep, oldCount + newCount);
+            // if (specialTuples.contains(t)) specialSets.add(rep);
+        }
+        /*
+         * for (IdentityHashMap.Entry<Tuple, Integer> e : repCounts.entrySet())
+         * { if (specialSets.contains(e.getKey())) e.setValue(e.getValue()
+         * multiplier); }
+         */
+        return repCounts;
+    }
+
+    // get all members of the set containing t
+    public Collection<Tuple> getMembers(Tuple t) {
+        Tuple representative = getRepresentative(t);
+
+        Collection<Tuple> members = new LinkedList<Tuple>();
+        for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+            Tuple t1 = e.getKey();
+            if (getRepresentative(t1) == representative)
+                members.add(t1);
+        }
+        return members;
+    }
+
+    // get a mapping from set representatives to members
+    public IdentityHashMap<Tuple, Collection<Tuple>> getMembershipMap() {
+        IdentityHashMap<Tuple, Collection<Tuple>> map = new IdentityHashMap<Tuple, Collection<Tuple>>();
+        for (IdentityHashMap.Entry<Tuple, Integer> e : counts.entrySet()) {
+            Tuple t = e.getKey();
+
+            Tuple representative = getRepresentative(t);
+            Collection<Tuple> members = map.get(representative);
+            if (members == null) {
+                members = new LinkedList<Tuple>();
+                map.put(representative, members);
+            }
+            members.add(t);
+        }
+        return map;
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/MetricEvaluation.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.util.IdentityHashSet;
+
+//Evaluates various metrics
+public class MetricEvaluation {
+    public static float getRealness(LogicalOperator op,
+            Map<LogicalOperator, DataBag> exampleData, boolean overallRealness) {
+        // StringBuffer str = new StringBuffer();
+        int noTuples = 0;
+        int noSynthetic = 0;
+        for (Map.Entry<LogicalOperator, DataBag> e : exampleData.entrySet()) {
+            // if(e.getKey() instanceof LORead) continue;
+            if (e.getKey().getAlias() == null)
+                continue;
+            DataBag bag;
+            if (overallRealness) {
+                bag = exampleData.get(e.getKey());
+            } else {
+                bag = exampleData.get(op);
+            }
+            noTuples += bag.size();
+            for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+                if (((ExampleTuple) it.next()).synthetic)
+                    noSynthetic++;
+            }
+            if (!overallRealness)
+                break;
+
+        }
+
+        if (noTuples == 0) {
+            if (noSynthetic == 0)
+                return 0.0f;
+            else
+                return 100.0f;
+        }
+        return 100 * (1 - ((float) noSynthetic / (float) noTuples));
+
+    }
+
+    public static float getConciseness(
+            LogicalOperator op,
+            Map<LogicalOperator, DataBag> exampleData,
+            Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses,
+            boolean overallConciseness) {
+        DataBag bag = exampleData.get(op);
+
+        int noEqCl = OperatorToEqClasses.get(op).size();
+        long noTuples = bag.size();
+
+        float conciseness = 100 * ((float) noEqCl / (float) noTuples);
+        if (!overallConciseness) {
+
+            return ((conciseness > 100.0) ? 100.0f : conciseness);
+        } else {
+
+            noEqCl = 0;
+            noTuples = 0;
+            conciseness = 0;
+            int noOperators = 0;
+
+            for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses
+                    .entrySet()) {
+                if (e.getKey().getAlias() == null)
+                    continue;
+                noOperators++; // we need to keep a track of these and not use
+                               // OperatorToEqClasses.size() as LORead shouldn't
+                               // be considered a operator
+                bag = exampleData.get(e.getKey());
+
+                noTuples = bag.size();
+                noEqCl = e.getValue().size();
+                float concise = 100 * ((float) noEqCl / (float) noTuples);
+                concise = (concise > 100) ? 100 : concise;
+                conciseness += concise;
+            }
+            conciseness /= (float) noOperators;
+
+            return conciseness;
+        }
+
+    }
+
+    public static float getCompleteness(
+            LogicalOperator op,
+            Map<LogicalOperator, DataBag> exampleData,
+            Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OperatorToEqClasses,
+            boolean overallCompleteness) {
+
+        int noClasses = 0;
+        int noCoveredClasses = 0;
+        int noOperators = 0;
+        Map<Integer, Boolean> coveredClasses;
+        float completeness = 0;
+        if (!overallCompleteness) {
+            Collection<IdentityHashSet<Tuple>> eqClasses = OperatorToEqClasses
+                    .get(op);
+            DataBag bag;
+
+            if (op instanceof LOFilter)
+                bag = exampleData.get(((LOFilter) op).getInput());
+            else
+                bag = exampleData.get(op);
+            coveredClasses = getCompletenessLogic(bag, eqClasses);
+            noClasses = eqClasses.size();
+            for (Map.Entry<Integer, Boolean> e : coveredClasses.entrySet()) {
+                if (e.getValue()) {
+                    noCoveredClasses++;
+                }
+            }
+
+            return 100 * ((float) noCoveredClasses) / (float) noClasses;
+        } else {
+            for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> e : OperatorToEqClasses
+                    .entrySet()) {
+                noCoveredClasses = 0;
+                noClasses = 0;
+
+                // if(e.getKey() instanceof LORead) continue; //We don't
+                // consider LORead a operator.
+                if (e.getKey().getAlias() == null)
+                    continue; // we want to consider join a single operator
+                noOperators++;
+                Collection<IdentityHashSet<Tuple>> eqClasses = e.getValue();
+                LogicalOperator lop = e.getKey();
+                DataBag bag;
+                if (lop instanceof LOFilter)
+                    bag = exampleData.get(((LOFilter) lop).getInput());
+                else
+                    bag = exampleData.get(lop);
+                coveredClasses = getCompletenessLogic(bag, eqClasses);
+                noClasses += eqClasses.size();
+                for (Map.Entry<Integer, Boolean> e_result : coveredClasses
+                        .entrySet()) {
+                    if (e_result.getValue()) {
+                        noCoveredClasses++;
+                    }
+                }
+                completeness += 100 * ((float) noCoveredClasses / (float) noClasses);
+            }
+            completeness /= (float) noOperators;
+
+            return completeness;
+        }
+
+    }
+
+    private static Map<Integer, Boolean> getCompletenessLogic(DataBag bag,
+            Collection<IdentityHashSet<Tuple>> eqClasses) {
+        Map<Integer, Boolean> coveredClasses = new HashMap<Integer, Boolean>();
+
+        for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            int classId = 0;
+            for (IdentityHashSet<Tuple> eqClass : eqClasses) {
+
+                if (eqClass.contains(t) || eqClass.size() == 0) {
+                    coveredClasses.put(classId, true);
+                }
+                classId++;
+            }
+        }
+
+        return coveredClasses;
+
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java?rev=699505&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java Fri Sep 26 14:23:35 2008
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class PreOrderDepthFirstWalker<O extends Operator, P extends OperatorPlan<O>>
+        extends PlanWalker<O, P> {
+    /**
+     * @param plan
+     *            Plan for this walker to traverse.
+     */
+    public PreOrderDepthFirstWalker(P plan) {
+        super(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * 
+     * @param visitor
+     *            Visitor this walker is being used by.
+     * @throws VisitorException
+     *             if an error is encountered while walking.
+     */
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        List<O> leaves = mPlan.getLeaves();
+        Set<O> seen = new HashSet<O>();
+
+        depthFirst(null, leaves, seen, visitor);
+    }
+
+    public PlanWalker<O, P> spawnChildWalker(P plan) {
+        return new DepthFirstWalker<O, P>(plan);
+    }
+
+    private void depthFirst(O node, Collection<O> predecessors, Set<O> seen,
+            PlanVisitor<O, P> visitor) throws VisitorException {
+        if (predecessors == null)
+            return;
+
+        for (O pred : predecessors) {
+            if (seen.add(pred)) {
+                pred.visit(visitor);
+                Collection<O> newPredecessors = mPlan.getPredecessors(pred);
+                depthFirst(pred, newPredecessors, seen, visitor);
+            }
+        }
+    }
+}