You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC

svn commit: r1598702 [14/23] - in /pig/trunk: ./ ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/ba...

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Fri May 30 19:07:23 2014
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnChainInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SortKeyInfo;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerUtil {
+    private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
+
+    private SecondaryKeyOptimizerUtil() {
+
+    }
+
+    private static class POToChange {
+        POToChange(PhysicalOperator oper, PhysicalPlan plan, POForEach forEach) {
+            this.oper = oper;
+            this.plan = plan;
+            this.forEach = forEach;
+        }
+
+        PhysicalOperator oper;
+
+        PhysicalPlan plan;
+
+        POForEach forEach;
+    }
+
+    public static class SecondaryKeyOptimizerInfo {
+
+        private int numSortRemoved = 0;
+        private int numDistinctChanged = 0;
+        private int numUseSecondaryKey = 0;
+        private boolean useSecondaryKey = false;
+        private boolean[] secondarySortOrder;
+
+        public int getNumSortRemoved() {
+            return numSortRemoved;
+        }
+
+        public void incrementNumSortRemoved() {
+            this.numSortRemoved++;
+        }
+
+        public int getNumDistinctChanged() {
+            return numDistinctChanged;
+        }
+
+        public void incrementNumDistinctChanged() {
+            this.numDistinctChanged++;
+        }
+
+        public int getNumUseSecondaryKey() {
+            return numUseSecondaryKey;
+        }
+
+        public void incrementNumUseSecondaryKey() {
+            this.numUseSecondaryKey++;
+        }
+
+        public boolean isUseSecondaryKey() {
+            return useSecondaryKey;
+        }
+
+        public void setUseSecondaryKey(boolean useSecondaryKey) {
+            this.useSecondaryKey = useSecondaryKey;
+        }
+
+        public boolean[] getSecondarySortOrder() {
+            return secondarySortOrder;
+        }
+
+        public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+            this.secondarySortOrder = secondarySortOrder;
+        }
+
+    }
+
+    // Build sort key structure from POLocalRearrange
+    private static SortKeyInfo getSortKeyInfo(POLocalRearrange rearrange) throws ExecException {
+        SortKeyInfo result = new SortKeyInfo();
+        List<PhysicalPlan> plans = rearrange.getPlans();
+        nextPlan: for (int i = 0; i < plans.size(); i++) {
+            PhysicalPlan plan = plans.get(i);
+            ColumnChainInfo columnChainInfo = new ColumnChainInfo();
+            if (plan.getRoots() == null) {
+                log.debug("POLocalRearrange plan is null");
+                return null;
+            } else if (plan.getRoots().size() != 1) {
+                // POLocalRearrange plan contains more than 1 root.
+                // Probably there is an Expression operator in the local
+                // rearrangement plan, skip this plan
+                continue nextPlan;
+            } else {
+                List<Integer> columns = new ArrayList<Integer>();
+                columns
+                        .add(rearrange.getIndex()
+                                & PigNullableWritable.idxSpace);
+
+                // The first item inside columnChainInfo is set to type Tuple.
+                // This value is not actually in use, but it intends to match
+                // the type of POProject in reduce side
+                columnChainInfo.insert(columns, DataType.TUPLE);
+
+                PhysicalOperator node = plan.getRoots().get(0);
+                while (node != null) {
+                    if (node instanceof POProject) {
+                        POProject project = (POProject) node;
+                        if(project.isProjectToEnd()){
+                            columnChainInfo.insert(project.getStartCol(),
+                                    project.getResultType());
+                        }else {
+                            columnChainInfo.insert(
+                                    project.getColumns(), project.getResultType());
+                        }
+
+                        if (plan.getSuccessors(node) == null)
+                            node = null;
+                        else if (plan.getSuccessors(node).size() != 1) {
+                            log.debug(node + " have more than 1 successor");
+                            node = null;
+                        } else
+                            node = plan.getSuccessors(node).get(0);
+                    } else
+                        // constant, UDF, we will pass
+                        continue nextPlan;
+                }
+            }
+            // Let's assume all main key is sorted ascendant, we can further
+            // optimize it to match one of the nested sort/distinct key, because we do not
+            // really care about how cogroup key are sorted; But it may not be the case
+            // if sometime we switch all the comparator to byte comparator, so just
+            // leave it as it is for now
+            result.insertColumnChainInfo(i, columnChainInfo, true);
+        }
+        return result;
+    }
+
+    public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+        log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
+        SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo();
+        List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
+        SortKeyInfo secondarySortKeyInfo = null;
+        List<POToChange> sortsToRemove = null;
+        List<POToChange> distinctsToChange = null;
+
+
+        List<PhysicalOperator> mapLeaves = mapPlan.getLeaves();
+        if (mapLeaves == null || mapLeaves.size() != 1) {
+            log.debug("Expected map to have single leaf! Skip secondary key optimizing");
+            return null;
+        }
+        PhysicalOperator mapLeaf = mapLeaves.get(0);
+
+        // Figure out the main key of the map-reduce job from POLocalRearrange
+        try {
+            if (mapLeaf instanceof POLocalRearrange) {
+                SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
+                if (sortKeyInfo == null) {
+                    log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+                    return null;
+                }
+                sortKeyInfos.add(sortKeyInfo);
+            } else if (mapLeaf instanceof POUnion) {
+                List<PhysicalOperator> preds = mapPlan
+                        .getPredecessors(mapLeaf);
+                for (PhysicalOperator pred : preds) {
+                    if (pred instanceof POLocalRearrange) {
+                        SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) pred);
+                        if (sortKeyInfo == null) {
+                            log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+                            return null;
+                        }
+                        sortKeyInfos.add(sortKeyInfo);
+                    }
+                }
+            } else {
+                log.debug("Cannot find POLocalRearrange or POUnion in map leaf, skip secondary key optimizing");
+                return null;
+            }
+        } catch (ExecException e) {
+            log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+            return null;
+        }
+
+        if (reducePlan.isEmpty()) {
+            log.debug("Reduce plan is empty, skip secondary key optimizing");
+            return null;
+        }
+
+        List<PhysicalOperator> reduceRoots = reducePlan.getRoots();
+        if (reduceRoots.size() != 1) {
+            log.debug("Expected reduce to have single root, skip secondary key optimizing");
+            return null;
+        }
+
+        PhysicalOperator root = reduceRoots.get(0);
+        if (!(root instanceof POPackage)) {
+            log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+            return null;
+        }
+
+        // visit the POForEach of the reduce plan. We can have Limit and Filter
+        // in the middle
+        PhysicalOperator currentNode = root;
+        POForEach foreach = null;
+        while (currentNode != null) {
+            if (currentNode instanceof POPackage
+                    && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
+                    || currentNode instanceof POFilter
+                    || currentNode instanceof POLimit) {
+                List<PhysicalOperator> succs = reducePlan
+                        .getSuccessors(currentNode);
+                if (succs == null) // We didn't find POForEach
+                    return null;
+                if (succs.size() != 1) {
+                    log.debug("See multiple output for " + currentNode
+                            + " in reduce plan, skip secondary key optimizing");
+                    return null;
+                }
+                currentNode = succs.get(0);
+            } else if (currentNode instanceof POForEach) {
+                foreach = (POForEach) currentNode;
+                break;
+            } else { // Skip optimization
+                return null;
+            }
+        }
+
+        // We do not find a foreach (we shall not come here, a trick to fool findbugs)
+        if (foreach==null)
+            return null;
+
+        sortsToRemove = new ArrayList<POToChange>();
+        distinctsToChange = new ArrayList<POToChange>();
+
+        for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
+            // visit inner plans to figure out the sort order for distinct /
+            // sort
+            SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
+                    innerPlan, sortKeyInfos, secondarySortKeyInfo);
+            try {
+                innerPlanDiscover.process();
+            } catch (FrontendException e) {
+                int errorCode = 2213;
+                throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+            }
+            secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+            if (innerPlanDiscover.getSortsToRemove() != null) {
+                for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
+                    sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
+                }
+            }
+            if (innerPlanDiscover.getDistinctsToChange() != null) {
+                for (PODistinct distinct : innerPlanDiscover
+                        .getDistinctsToChange()) {
+                    distinctsToChange.add(new POToChange(distinct, innerPlan,
+                            foreach));
+                }
+            }
+        }
+
+        try {
+            // Change PODistinct to use POSortedDistinct, which assume the input
+            // data is sorted
+            for (POToChange distinctToChange : distinctsToChange) {
+                secKeyOptimizerInfo.incrementNumDistinctChanged();
+                PODistinct oldDistinct = (PODistinct) distinctToChange.oper;
+                String scope = oldDistinct.getOperatorKey().scope;
+                POSortedDistinct newDistinct = new POSortedDistinct(
+                        new OperatorKey(scope, NodeIdGenerator.getGenerator()
+                                .getNextNodeId(scope)), oldDistinct
+                                .getRequestedParallelism(), oldDistinct
+                                .getInputs());
+                newDistinct.setInputs(oldDistinct.getInputs());
+                newDistinct.setResultType(oldDistinct.getResultType());
+                distinctToChange.plan.replace(oldDistinct, newDistinct);
+                distinctToChange.forEach.getLeaves();
+            }
+            // Removed POSort, if the successor require a databag, we need to
+            // add a PORelationToExprProject
+            // to convert tuples into databag
+            for (POToChange sortToRemove : sortsToRemove) {
+                secKeyOptimizerInfo.incrementNumSortRemoved();
+                POSort oldSort = (POSort) sortToRemove.oper;
+                String scope = oldSort.getOperatorKey().scope;
+                List<PhysicalOperator> preds = sortToRemove.plan
+                        .getPredecessors(sortToRemove.oper);
+                List<PhysicalOperator> succs = sortToRemove.plan
+                .getSuccessors(sortToRemove.oper);
+                POProject project = null;
+                if ((preds == null
+                        || preds.get(0).getResultType() != DataType.BAG
+                        && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+                        && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+                {
+                    project = new PORelationToExprProject(new OperatorKey(
+                            scope, NodeIdGenerator.getGenerator()
+                                    .getNextNodeId(scope)), oldSort
+                            .getRequestedParallelism());
+                    project.setInputs(oldSort.getInputs());
+                    project.setResultType(DataType.BAG);
+                    project.setStar(true);
+                }
+                if (project == null)
+                    sortToRemove.plan.removeAndReconnect(sortToRemove.oper);
+                else
+                    sortToRemove.plan.replace(oldSort, project);
+                sortToRemove.forEach.getLeaves();
+            }
+        } catch (PlanException e) {
+            int errorCode = 2202;
+            throw new VisitorException(
+                    "Error change distinct/sort to use secondary key optimizer",
+                    errorCode, e);
+        }
+        if (secondarySortKeyInfo != null) {
+            // Adjust POLocalRearrange, POPackage, MapReduceOper to use the
+            // secondary key
+            secKeyOptimizerInfo.incrementNumUseSecondaryKey();
+            secKeyOptimizerInfo.setUseSecondaryKey(true);
+            secKeyOptimizerInfo.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+            int indexOfRearrangeToChange = -1;
+            for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+                    .getColumnChains()) {
+                ColumnInfo currentColumn = columnChainInfo.getColumnInfos()
+                        .get(0);
+                int index = currentColumn.getColumns().get(0);
+                if (indexOfRearrangeToChange == -1)
+                    indexOfRearrangeToChange = index;
+                else if (indexOfRearrangeToChange != index) {
+                    int errorCode = 2203;
+                    throw new VisitorException("Sort on columns from different inputs.", errorCode);
+                }
+            }
+            if (mapLeaf instanceof POLocalRearrange) {
+                ((POLocalRearrange) mapLeaf).setUseSecondaryKey(true);
+                setSecondaryPlan(mapPlan, (POLocalRearrange) mapLeaf,
+                        secondarySortKeyInfo);
+            } else if (mapLeaf instanceof POUnion) {
+                List<PhysicalOperator> preds = mapPlan
+                        .getPredecessors(mapLeaf);
+                boolean found = false;
+                for (PhysicalOperator pred : preds) {
+                    POLocalRearrange rearrange = (POLocalRearrange) pred;
+                    rearrange.setUseSecondaryKey(true);
+                    if (rearrange.getIndex() == indexOfRearrangeToChange) {
+                        // Try to find the POLocalRearrange for the secondary key
+                        found = true;
+                        setSecondaryPlan(mapPlan, rearrange, secondarySortKeyInfo);
+                    }
+                }
+                if (!found)
+                {
+                    int errorCode = 2214;
+                    throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
+                }
+            }
+            POPackage pack = (POPackage) root;
+            pack.getPkgr().setUseSecondaryKey(true);
+        }
+        return secKeyOptimizerInfo;
+    }
+
+    private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange,
+            SortKeyInfo secondarySortKeyInfo) throws VisitorException {
+        // Put plan to project secondary key to the POLocalRearrange
+        try {
+            String scope = rearrange.getOperatorKey().scope;
+            List<PhysicalPlan> secondaryPlanList = new ArrayList<PhysicalPlan>();
+            for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+                    .getColumnChains()) {
+                PhysicalPlan secondaryPlan = new PhysicalPlan();
+                for (int i = 1; i < columnChainInfo.size(); i++) {
+                    // The first item in columnChainInfo indicate the index of
+                    // input, we have addressed
+                    // already before we come here
+                    ColumnInfo columnInfo = columnChainInfo.getColumnInfo(i);
+                    POProject project = new POProject(
+                            new OperatorKey(scope, NodeIdGenerator
+                                    .getGenerator().getNextNodeId(scope)),
+                            rearrange.getRequestedParallelism());
+                    if(columnInfo.isRangeProject())
+                        project.setProjectToEnd(columnInfo.getStartCol());
+                    else
+                        project
+                                .setColumns((ArrayList<Integer>) columnInfo.getColumns());
+                    project.setResultType(columnInfo.getResultType());
+                    secondaryPlan.addAsLeaf(project);
+                }
+                if (secondaryPlan.isEmpty()) { // If secondary key sort on the
+                                               // input as a whole
+                    POProject project = new POProject(
+                            new OperatorKey(scope, NodeIdGenerator
+                                    .getGenerator().getNextNodeId(scope)),
+                            rearrange.getRequestedParallelism());
+                    project.setStar(true);
+                    secondaryPlan.addAsLeaf(project);
+                }
+                secondaryPlanList.add(secondaryPlan);
+            }
+            rearrange.setSecondaryPlans(secondaryPlanList);
+        } catch (PlanException e) {
+            int errorCode = 2204;
+            throw new VisitorException("Error setting secondary key plan",
+                    errorCode, e);
+        }
+    }
+
+    // Find eligible sort and distinct physical operators from the reduce plan.
+    // SecondaryKeyChecker will check for sort/distinct keys (for distinct, it
+    // is
+    // always the entire bag), if it is the same with the main key, then we can
+    // just
+    // remove it. If it is not, then we can have 1 secondary sort key, put the
+    // first
+    // such sort/distinct key as the secondary sort key. For subsequent
+    // sort/distinct,
+    // we cannot do any secondary key optimization because we only have 1
+    // secondary
+    // sort key.
+    private static class SecondaryKeyDiscover {
+        PhysicalPlan mPlan;
+
+        List<POSort> sortsToRemove = new ArrayList<POSort>();
+
+        List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
+
+        List<SortKeyInfo> sortKeyInfos;
+
+        SortKeyInfo secondarySortKeyInfo;
+
+        ColumnChainInfo columnChainInfo = null;
+
+        // PhysicalPlan here is foreach inner plan
+        SecondaryKeyDiscover(PhysicalPlan plan,
+                List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
+            this.mPlan = plan;
+            this.sortKeyInfos = sortKeyInfos;
+            this.secondarySortKeyInfo = secondarySortKeyInfo;
+        }
+
+        public void process() throws FrontendException
+        {
+            List<PhysicalOperator> roots = mPlan.getRoots();
+            for (PhysicalOperator root : roots) {
+                columnChainInfo = new ColumnChainInfo();
+                processRoot(root);
+            }
+        }
+
+        public void processRoot(PhysicalOperator root) throws FrontendException {
+            PhysicalOperator currentNode = root;
+            while (currentNode!=null) {
+                boolean sawInvalidPhysicalOper = false;
+                if (currentNode instanceof PODistinct)
+                    sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+                else if (currentNode instanceof POSort)
+                    sawInvalidPhysicalOper = processSort((POSort)currentNode);
+                else if (currentNode instanceof POProject)
+                    sawInvalidPhysicalOper = processProject((POProject)currentNode);
+                else if (currentNode instanceof POUserFunc ||
+                         currentNode instanceof POUnion ||
+                         // We don't process foreach, since foreach is too complex to get right
+                         currentNode instanceof POForEach)
+                    break;
+
+                if (sawInvalidPhysicalOper)
+                    break;
+
+                List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+                if (succs==null)
+                    currentNode = null;
+                else {
+                    if (succs.size()>1) {
+                        int errorCode = 2215;
+                        throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+                                errorCode);
+                    }
+                    currentNode = succs.get(0);
+                }
+            }
+        }
+
+        // We see PODistinct, check which key it is using
+        public boolean processDistinct(PODistinct distinct) throws FrontendException {
+            SortKeyInfo keyInfos = new SortKeyInfo();
+            try {
+                keyInfos.insertColumnChainInfo(0,
+                        (ColumnChainInfo) columnChainInfo.clone(), true);
+            } catch (CloneNotSupportedException e) { // We implement Clonable,
+                                                     // impossible to get here
+            }
+
+            // if it is part of main key
+            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+                if (sortKeyInfo.moreSpecificThan(keyInfos)) {
+                    distinctsToChange.add(distinct);
+                    return false;
+                }
+            }
+
+            // if it is part of secondary key
+            if (secondarySortKeyInfo != null
+                    && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
+                distinctsToChange.add(distinct);
+                return false;
+            }
+
+            // Now set the secondary key
+            if (secondarySortKeyInfo == null) {
+                distinctsToChange.add(distinct);
+                secondarySortKeyInfo = keyInfos;
+            }
+            return false;
+        }
+
+        // Accumulate column info
+        public boolean processProject(POProject project) throws FrontendException {
+            columnChainInfo.insertInReduce(project);
+            return false;
+        }
+
+        // We see POSort, check which key it is using
+        public boolean processSort(POSort sort) throws FrontendException{
+            SortKeyInfo keyInfo = new SortKeyInfo();
+            for (int i = 0; i < sort.getSortPlans().size(); i++) {
+                PhysicalPlan sortPlan = sort.getSortPlans().get(i);
+                ColumnChainInfo sortChainInfo = null;
+                try {
+                    sortChainInfo = (ColumnChainInfo) columnChainInfo.clone();
+                } catch (CloneNotSupportedException e) { // We implement
+                                                         // Clonable, impossible
+                                                         // to get here
+                }
+                boolean r = false;
+                try {
+                    r = collectColumnChain(sortPlan, sortChainInfo);
+                } catch (PlanException e) {
+                    int errorCode = 2206;
+                    throw new FrontendException("Error visiting POSort inner plan",
+                            errorCode, e);
+                }
+                if (r==true) // if we saw physical operator other than project in sort plan
+                {
+                    return true;
+                }
+                keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
+                        .getMAscCols().get(i));
+            }
+            // if it is part of main key
+            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+                if (sortKeyInfo.moreSpecificThan(keyInfo)) {
+                    sortsToRemove.add(sort);
+                    return false;
+                }
+            }
+            // if it is part of secondary key
+            if (secondarySortKeyInfo != null
+                    && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
+                sortsToRemove.add(sort);
+                return false;
+            }
+
+            // Now set the secondary key
+            if (secondarySortKeyInfo == null) {
+                sortsToRemove.add(sort);
+                secondarySortKeyInfo = keyInfo;
+            }
+            return false;
+        }
+
+        public List<POSort> getSortsToRemove() {
+            return sortsToRemove;
+        }
+
+        public List<PODistinct> getDistinctsToChange() {
+            return distinctsToChange;
+        }
+
+        public SortKeyInfo getSecondarySortKeyInfo() {
+            return secondarySortKeyInfo;
+        }
+    }
+
+    // Return true if we saw physical operators other than project in the plan
+    static private boolean collectColumnChain(PhysicalPlan plan,
+            ColumnChainInfo columnChainInfo) throws PlanException {
+        if (plan.getRoots().size() != 1) {
+            return true;
+        }
+
+        PhysicalOperator currentNode = plan.getRoots().get(0);
+
+        while (currentNode != null) {
+            if (currentNode instanceof POProject) {
+                POProject project = (POProject) currentNode;
+                columnChainInfo.insertInReduce(project);
+            } else {
+                return true;
+            }
+            List<PhysicalOperator> succs = plan.getSuccessors(currentNode);
+            if (succs == null)
+                break;
+            if (succs.size() != 1) {
+                int errorCode = 2208;
+                throw new PlanException(
+                        "Exception visiting foreach inner plan", errorCode);
+            }
+            currentNode = succs.get(0);
+        }
+        return false;
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Fri May 30 19:07:23 2014
@@ -97,6 +97,7 @@ public class HadoopExecutableManager ext
         scriptLogDir = job.get("pig.streaming.log.dir", "_logs");
         
         // Save the taskid
+        // TODO Get an equivalent property in Tez mode (currently this returns null)
         taskId = job.get("mapred.task.id");
     }
     
@@ -129,6 +130,9 @@ public class HadoopExecutableManager ext
             super.close();
 
             // Copy the secondary outputs of the task to HDFS
+            if (this.scriptOutputDir==null) {
+                return;
+            }
             Path scriptOutputDir = new Path(this.scriptOutputDir);
             FileSystem fs = scriptOutputDir.getFileSystem(job);
             List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
@@ -171,7 +175,7 @@ public class HadoopExecutableManager ext
      *         HDFS, <code>false</code> otherwise
      */
     private boolean writeErrorToHDFS(int limit, String taskId) {
-        if (command.getPersistStderr()) {
+        if (command.getPersistStderr() && taskId != null) {
             int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
             return tipId < command.getLogFilesLimit();
         }

Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Fri May 30 19:07:23 2014
@@ -688,10 +688,6 @@ public class BinInterSedes implements In
 
         @Override
         public void setConf(Configuration conf) {
-            if (!(conf instanceof JobConf)) {
-                mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-                return;
-            }
             try {
                 mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
                 mSecondaryAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.secondarySortOrder"));

Modified: pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Fri May 30 19:07:23 2014
@@ -51,7 +51,7 @@ public abstract class DefaultAbstractBag
     // If we grow past 100K, may be worthwhile to register.
     private static final int SPILL_REGISTER_THRESHOLD = 100 * 1024;
 
-    private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+    private static PigLogger pigLogger;
 
     private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
     // Container that holds the tuples. Actual object instantiated by
@@ -85,7 +85,7 @@ public abstract class DefaultAbstractBag
 
 
     /**
-     * Sample every SPILL_SAMPLE_FREQUENCYth tuple 
+     * Sample every SPILL_SAMPLE_FREQUENCYth tuple
      * until we reach a max of SPILL_SAMPLE_SIZE
      * to get an estimate of the tuple sizes.
      */

Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Fri May 30 19:07:23 2014
@@ -230,13 +230,8 @@ public class DefaultTuple extends Abstra
 
         @Override
         public void setConf(Configuration conf) {
-            if (!(conf instanceof JobConf)) {
-                mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-                return;
-            }
-            JobConf jconf = (JobConf) conf;
             try {
-                mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+                mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
             } catch (IOException ioe) {
                 mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
                 throw new RuntimeException(ioe);

Modified: pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Fri May 30 19:07:23 2014
@@ -38,15 +38,15 @@ public class ReadOnceBag implements Data
 
     // The Packager that created this
     protected Packager pkgr;
-    
+
     //The iterator of Tuples. Marked transient because we will never serialize this.
     protected transient Iterator<NullableTuple> tupIter;
-    
+
     // The key being worked on
     protected PigNullableWritable keyWritable;
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
@@ -75,7 +75,7 @@ public class ReadOnceBag implements Data
 
     /* (non-Javadoc)
      * @see org.apache.pig.impl.util.Spillable#spill()
-  
+
      */
     @Override
     public long spill() {
@@ -87,7 +87,7 @@ public class ReadOnceBag implements Data
      */
     @Override
     public void add(Tuple t) {
-        throw new RuntimeException("ReadOnceBag does not support add operation");		
+        throw new RuntimeException("ReadOnceBag does not support add operation");
     }
 
     /* (non-Javadoc)
@@ -131,7 +131,7 @@ public class ReadOnceBag implements Data
     }
 
     /* (non-Javadoc)
-	 * @see org.apache.pig.data.DataBag#markStale(boolean)
+     * @see org.apache.pig.data.DataBag#markStale(boolean)
      */
     @Override
     public void markStale(boolean stale) {
@@ -166,7 +166,7 @@ public class ReadOnceBag implements Data
 
     /* (non-Javadoc)
      * @see java.lang.Comparable#compareTo(java.lang.Object)
-     * This has to be defined since DataBag implements 
+     * This has to be defined since DataBag implements
      * Comparable although, in this case we cannot really compare.
      */
     @Override
@@ -202,7 +202,7 @@ public class ReadOnceBag implements Data
 
     @Override
     public int hashCode() {
-    	int hash = 7;
+        int hash = 7;
         if (pkgr.getKeyTuple())
         {
             hash = hash * 31 + pkgr.getKeyAsTuple().hashCode();
@@ -236,18 +236,18 @@ public class ReadOnceBag implements Data
                 ret = pkgr.getValueTuple(keyWritable, ntup, index);
             } catch (ExecException e)
             {
-            	throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
+                throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
             }
             return ret;
         }
-		
+
         /* (non-Javadoc)
          * @see java.util.Iterator#remove()
          */
         @Override
         public void remove() {
-            throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");    
+            throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
         }
-	}
+    }
 }
 

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Fri May 30 19:07:23 2014
@@ -148,7 +148,7 @@ public class PigContext implements Seria
     private static ThreadLocal<ArrayList<String>> packageImportList =
         new ThreadLocal<ArrayList<String>>();
 
-    private static ThreadLocal<Map<String,Class<?>>> classCache = 
+    private static ThreadLocal<Map<String,Class<?>>> classCache =
         new ThreadLocal<Map<String,Class<?>>>();
 
     private Properties log4jProperties = new Properties();
@@ -242,14 +242,14 @@ public class PigContext implements Seria
         this(ExecType.MAPREDUCE, new Properties());
     }
 
-        public PigContext(Configuration conf) throws PigException {
-            this(ConfigurationUtil.toProperties(conf));
-        }
-        
-        public PigContext(Properties properties) throws PigException {
-            this(ExecTypeProvider.selectExecType(properties), properties);
-        }
-    
+    public PigContext(Configuration conf) throws PigException {
+        this(ConfigurationUtil.toProperties(conf));
+    }
+
+    public PigContext(Properties properties) throws PigException {
+        this(ExecTypeProvider.selectExecType(properties), properties);
+    }
+
     public PigContext(ExecType execType, Configuration conf) {
         this(execType, ConfigurationUtil.toProperties(conf));
     }
@@ -276,7 +276,7 @@ public class PigContext implements Seria
         skippedShipPaths.add("/sbin");
         skippedShipPaths.add("/usr/sbin");
         skippedShipPaths.add("/usr/local/sbin");
-        
+
         macros = new HashMap<String, Tree>();
         scriptingUDFs = new HashMap<String, String>();
 
@@ -376,10 +376,10 @@ public class PigContext implements Seria
     }
 
     /**
-     * Adds the specified path to the predeployed jars list. These jars will 
+     * Adds the specified path to the predeployed jars list. These jars will
      * never be included in generated job jar.
      * <p>
-     * This can be called for jars that are pre-installed on the Hadoop 
+     * This can be called for jars that are pre-installed on the Hadoop
      * cluster to reduce the size of the job jar.
      */
     public void markJarAsPredeployed(String path) {
@@ -644,24 +644,24 @@ public class PigContext implements Seria
             c = new HashMap<String,Class<?>>();
             classCache.set(c);
         }
-             
+
         return c;
     }
-    
+
     @SuppressWarnings("rawtypes")
     public static Class resolveClassName(String name) throws IOException{
-        Map<String,Class<?>> cache = getClassCache(); 
-        
+        Map<String,Class<?>> cache = getClassCache();
+
         Class c = cache.get(name);
         if (c != null) {
             return c;
         }
-        
+
         for(String prefix: getPackageImportList()) {
             try {
                 c = Class.forName(prefix+name,true, PigContext.classloader);
                 cache.put(name, c);
-                
+
                 return c;
             }
             catch (ClassNotFoundException e) {
@@ -868,11 +868,7 @@ public class PigContext implements Seria
      * @return error source
      */
     public byte getErrorSource() {
-        if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) {
-            return PigException.REMOTE_ENVIRONMENT;
-        } else {
-            return PigException.BUG;
-        }
+        return PigException.REMOTE_ENVIRONMENT;
     }
 
     public static ArrayList<String> getPackageImportList() {

Modified: pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Fri May 30 19:07:23 2014
@@ -40,278 +40,268 @@ import org.apache.pig.impl.util.Pair;
  * sampling process. It figures out how many reducers required to process a
  * skewed key without causing spill and allocate this number of reducers to this
  * key. This UDF outputs a map which contains 2 keys:
- * 
+ *
  * <li>&quot;totalreducers&quot;: the value is an integer wich indicates the
  *         number of total reducers for this join job </li>
  * <li>&quot;partition.list&quot;: the value is a bag which contains a
  * list of tuples with each tuple representing partitions for a skewed key.
- * The tuple has format of &lt;join key&gt;,&lt;min index of reducer&gt;, 
+ * The tuple has format of &lt;join key&gt;,&lt;min index of reducer&gt;,
  * &lt;max index of reducer&gt; </li>
- * 
- * For example, a join job configures 10 reducers, and the sampling process 
+ *
+ * For example, a join job configures 10 reducers, and the sampling process
  * finds out 2 skewed keys, &quot;swpv&quot; needs 4 reducers and &quot;swps&quot;
  * needs 2 reducers. The output file would be like following:
- * 
+ *
  * {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}}
  *
- * The name of this file is set into next MR job which does the actual join. 
+ * The name of this file is set into next MR job which does the actual join.
  * That job uses this information to partition skewed keys properly
- * 
+ *
  */
 
 public class PartitionSkewedKeys extends EvalFunc<Map<String, Object>> {
 
-	public static final String PARTITION_LIST = "partition.list";
+    public static final String PARTITION_LIST = "partition.list";
 
-	public static final String TOTAL_REDUCERS = "totalreducers";
+    public static final String TOTAL_REDUCERS = "totalreducers";
 
-	public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
+    public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
 
-	private Log log = LogFactory.getLog(getClass());
+    private Log log = LogFactory.getLog(getClass());
 
-	BagFactory mBagFactory = BagFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
 
-	TupleFactory mTupleFactory = TupleFactory.getInstance();
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-	private int currentIndex_;
+    private int currentIndex_;
 
-	private int totalReducers_;
+    private int totalReducers_;
 
-	private long totalMemory_;
+    private long totalMemory_;
 
-	private String inputFile_;
+    private long totalSampleCount_;
 
-	private long totalSampleCount_;
+    private double heapPercentage_;
 
-	private double heapPercentage_;
-	
     // specify how many tuple a reducer can hold for a key
     // this is for testing purpose. If not specified, then
     // it is calculated based on memory size and size of tuple
-	private int tupleMCount_; 
+    private int tupleMCount_;
 
-	public PartitionSkewedKeys() {
-		this(null);
-	}
-
-	public PartitionSkewedKeys(String[] args) {
-		totalReducers_ = -1;
-		currentIndex_ = 0;
-
-		if (args != null && args.length > 0) {
-			heapPercentage_ = Double.parseDouble(args[0]);
-			tupleMCount_ = Integer.parseInt(args[1]);
-			inputFile_ = args[2];			
-		} else {
-			heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
-		}
-		
-		if (log.isDebugEnabled()) {
-			log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
-			log.debug("input file: " + inputFile_);
-		}
-		
-		log.info("input file: " + inputFile_);
-
-	}
-
-	/**
-	 * first field in the input tuple is the number of reducers
-	 * 
-	 * second field is the *sorted* bag of samples
-	 * this should be called only once
-	 */
-	public Map<String, Object> exec(Tuple in) throws IOException {
-	    if (in == null || in.size() == 0) {                     
-	        return null;
-	    }
-	    Map<String, Object> output = new HashMap<String, Object>();
-
-	    totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
-	    log.info("Maximum of available memory is " + totalMemory_);
-
-	    ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
-	    Tuple currentTuple = null;
-	    long count = 0;
-	    
-	    // total size in memory for tuples in sample 
-	    long totalSampleMSize = 0;
-	    
-	    //total input rows for the join
-	    long totalInputRows = 0;
-	    
-	    try {
-	        totalReducers_ = (Integer) in.get(0);
-	        DataBag samples = (DataBag) in.get(1);
-
-	        totalSampleCount_ = samples.size();
-
-	        log.info("totalSample: " + totalSampleCount_);
-	        log.info("totalReducers: " + totalReducers_);			
-
-	        int maxReducers = 0;
-
-	        // first iterate the samples to find total number of rows
-	        Iterator<Tuple> iter1 = samples.iterator();
-	        while (iter1.hasNext()) {
-	            Tuple t = iter1.next();
-	            totalInputRows += (Long)t.get(t.size() - 1);
-	        }
-	                        
-	        // now iterate samples to do the reducer calculation
-	        Iterator<Tuple> iter2 = samples.iterator();
-	        while (iter2.hasNext()) {
-	            Tuple t = iter2.next();
-	            if (hasSameKey(currentTuple, t) || currentTuple == null) {
-	                count++;
-	                totalSampleMSize += getMemorySize(t);
-	            } else {
-	                Pair<Tuple, Integer> p = calculateReducers(currentTuple,
-	                        count, totalSampleMSize, totalInputRows);
-	                Tuple rt = p.first;
-	                if (rt != null) {
-	                    reducerList.add(rt);
-	                }
-	                if (maxReducers < p.second) {
-	                    maxReducers = p.second;
-	                }
-	                count = 1;
-	                totalSampleMSize = getMemorySize(t);
-	            }
-
-	            currentTuple = t;
-	        }
-
-	        // add last key
-	        if (count > 0) {
-	            Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
-	                    totalSampleMSize, totalInputRows);
-	            Tuple rt = p.first;
-	            if (rt != null) {
-	                reducerList.add(rt);
-	            }
-	            if (maxReducers < p.second) {
-	                maxReducers = p.second;
-	            }
-	        }
-
-	        if (maxReducers > totalReducers_) {
-	            if(pigLogger != null) {
-	                pigLogger.warn(this,"You need at least " + maxReducers
-	                        + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
-	            } else {
-	                log.warn("You need at least " + maxReducers
-	                        + " reducers to avoid spillage and run this job efficiently.");
-	            }
-	        }
-
-	        output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
-	        output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
-
-	        log.info(output.toString());
-	        if (log.isDebugEnabled()) {
-	            log.debug(output.toString());
-	        }
-
-	        return output;
-	    } catch (Exception e) {
-	        e.printStackTrace();
-	        throw new RuntimeException(e);
-	    }
-	}
-
-	private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
-	        long count, long totalMSize, long totalTuples) {
-	    // get average memory size per tuple
-	    double avgM = totalMSize / (double) count;
-
-	    // get the number of tuples that can fit into memory
-	    long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
-
-	    // estimate the number of total tuples for this key
-            long keyTupleCount = (long)  ( ((double) count/ totalSampleCount_) * 
-                        totalTuples);
-                            
-	    
-	    int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
-	            / tupleMCount));
-
-	    if (log.isDebugEnabled()) 
-	    {
-	        log.debug("avgM: " + avgM);
-	        log.debug("tuple count: " + keyTupleCount);
-	        log.debug("count: " + count);
-	        log.debug("A reducer can take " + tupleMCount + " tuples and "
-	                + keyTupleCount + " tuples are find for " + currentTuple);
-	        log.debug("key " + currentTuple + " need " + redCount + " reducers");
-	    }
-
-	    // this is not a skewed key
-	    if (redCount <= 1) {
-	        return new Pair<Tuple, Integer>(null, 1);
-	    }
-
-	    Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
-	    int i = 0;
-	    try {
-	        // set keys
-	        for (; i < currentTuple.size() - 2; i++) {
-	            t.set(i, currentTuple.get(i));
-	        }
-
-	        int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
-	        // set the min index of reducer for this key
-	        t.set(i++, currentIndex_);
-	        currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
-	        if (currentIndex_ < 0) {
-	            currentIndex_ += totalReducers_;
-	        }
-	        // set the max index of reducer for this key
-	        t.set(i++, currentIndex_);
-	    } catch (ExecException e) {
-	        throw new RuntimeException("Failed to set value to tuple." + e);
-	    }
-
-	    currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
-	    Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
-	    return p;
-	}
-
-	// the last field of the tuple is a tuple for memory size and disk size
-	private long getMemorySize(Tuple t) {
-	    int s = t.size();
-	    try {
-	        return (Long) t.get(s - 2);
-	    } catch (ExecException e) {
-	        throw new RuntimeException(
-	                "Unable to retrive the size field from tuple.", e);
-	    }
-	}
-
-
-	private boolean hasSameKey(Tuple t1, Tuple t2) {
-		// Have to break the tuple down and compare it field to field.
-		int sz1 = t1 == null ? 0 : t1.size();
-		int sz2 = t2 == null ? 0 : t2.size();
-		if (sz2 != sz1) {
-			return false;
-		}
-
-		for (int i = 0; i < sz1 - 2; i++) {
-			try {
-				int c = DataType.compare(t1.get(i), t2.get(i));
-				if (c != 0) {
-					return false;
-				}
-			} catch (ExecException e) {
-				throw new RuntimeException("Unable to compare tuples", e);
-			}
-		}
+    public PartitionSkewedKeys() {
+        this(null);
+    }
+
+    public PartitionSkewedKeys(String[] args) {
+        totalReducers_ = -1;
+        currentIndex_ = 0;
+
+        if (args != null && args.length > 0) {
+            heapPercentage_ = Double.parseDouble(args[0]);
+            tupleMCount_ = Integer.parseInt(args[1]);
+        } else {
+            heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
+        }
+    }
+
+    /**
+     * first field in the input tuple is the number of reducers
+     *
+     * second field is the *sorted* bag of samples
+     * this should be called only once
+     */
+    public Map<String, Object> exec(Tuple in) throws IOException {
+        if (in == null || in.size() == 0) {
+            return null;
+        }
+        Map<String, Object> output = new HashMap<String, Object>();
+
+        totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+        log.info("Maximum of available memory is " + totalMemory_);
+
+        ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+        Tuple currentTuple = null;
+        long count = 0;
+
+        // total size in memory for tuples in sample
+        long totalSampleMSize = 0;
+
+        //total input rows for the join
+        long totalInputRows = 0;
+
+        try {
+            totalReducers_ = (Integer) in.get(0);
+            DataBag samples = (DataBag) in.get(1);
+
+            totalSampleCount_ = samples.size();
+
+            log.info("totalSample: " + totalSampleCount_);
+            log.info("totalReducers: " + totalReducers_);
+
+            int maxReducers = 0;
+
+            // first iterate the samples to find total number of rows
+            Iterator<Tuple> iter1 = samples.iterator();
+            while (iter1.hasNext()) {
+                Tuple t = iter1.next();
+                totalInputRows += (Long)t.get(t.size() - 1);
+            }
+
+            // now iterate samples to do the reducer calculation
+            Iterator<Tuple> iter2 = samples.iterator();
+            while (iter2.hasNext()) {
+                Tuple t = iter2.next();
+                if (hasSameKey(currentTuple, t) || currentTuple == null) {
+                    count++;
+                    totalSampleMSize += getMemorySize(t);
+                } else {
+                    Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+                            count, totalSampleMSize, totalInputRows);
+                    Tuple rt = p.first;
+                    if (rt != null) {
+                        reducerList.add(rt);
+                    }
+                    if (maxReducers < p.second) {
+                        maxReducers = p.second;
+                    }
+                    count = 1;
+                    totalSampleMSize = getMemorySize(t);
+                }
+
+                currentTuple = t;
+            }
+
+            // add last key
+            if (count > 0) {
+                Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+                        totalSampleMSize, totalInputRows);
+                Tuple rt = p.first;
+                if (rt != null) {
+                    reducerList.add(rt);
+                }
+                if (maxReducers < p.second) {
+                    maxReducers = p.second;
+                }
+            }
+
+            if (maxReducers > totalReducers_) {
+                if(pigLogger != null) {
+                    pigLogger.warn(this,"You need at least " + maxReducers
+                            + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
+                } else {
+                    log.warn("You need at least " + maxReducers
+                            + " reducers to avoid spillage and run this job efficiently.");
+                }
+            }
+
+            output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+            output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+            log.info(output.toString());
+            if (log.isDebugEnabled()) {
+                log.debug(output.toString());
+            }
+
+            return output;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
+            long count, long totalMSize, long totalTuples) {
+        // get average memory size per tuple
+        double avgM = totalMSize / (double) count;
+
+        // get the number of tuples that can fit into memory
+        long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+        // estimate the number of total tuples for this key
+        long keyTupleCount = (long)  ( ((double) count/ totalSampleCount_) * totalTuples);
+
+        int redCount = (int) Math.round(Math.ceil((double) keyTupleCount / tupleMCount));
+
+        if (log.isDebugEnabled())
+        {
+            log.debug("avgM: " + avgM);
+            log.debug("tuple count: " + keyTupleCount);
+            log.debug("count: " + count);
+            log.debug("A reducer can take " + tupleMCount + " tuples and "
+                    + keyTupleCount + " tuples are find for " + currentTuple);
+            log.debug("key " + currentTuple + " need " + redCount + " reducers");
+        }
+
+        // this is not a skewed key
+        if (redCount <= 1) {
+            return new Pair<Tuple, Integer>(null, 1);
+        }
+
+        Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+        int i = 0;
+        try {
+            // set keys
+            for (; i < currentTuple.size() - 2; i++) {
+                t.set(i, currentTuple.get(i));
+            }
+
+            int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
+            // set the min index of reducer for this key
+            t.set(i++, currentIndex_);
+            currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
+            if (currentIndex_ < 0) {
+                currentIndex_ += totalReducers_;
+            }
+            // set the max index of reducer for this key
+            t.set(i++, currentIndex_);
+        } catch (ExecException e) {
+            throw new RuntimeException("Failed to set value to tuple." + e);
+        }
+
+        currentIndex_ = (currentIndex_ + 1) % totalReducers_;
+
+        Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
+
+        return p;
+    }
+
+    // the last field of the tuple is a tuple for memory size and disk size
+    private long getMemorySize(Tuple t) {
+        int s = t.size();
+        try {
+            return (Long) t.get(s - 2);
+        } catch (ExecException e) {
+            throw new RuntimeException(
+                    "Unable to retrive the size field from tuple.", e);
+        }
+    }
+
+
+    private boolean hasSameKey(Tuple t1, Tuple t2) {
+        // Have to break the tuple down and compare it field to field.
+        int sz1 = t1 == null ? 0 : t1.size();
+        int sz2 = t2 == null ? 0 : t2.size();
+        if (sz2 != sz1) {
+            return false;
+        }
+
+        for (int i = 0; i < sz1 - 2; i++) {
+            try {
+                int c = DataType.compare(t1.get(i), t2.get(i));
+                if (c != 0) {
+                    return false;
+                }
+            } catch (ExecException e) {
+                throw new RuntimeException("Unable to compare tuples", e);
+            }
+        }
 
-		return true;
-	}
+        return true;
+    }
 
 }

Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Fri May 30 19:07:23 2014
@@ -18,71 +18,57 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.util.Pair;
 
 /**
  * See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
  */
 public class PoissonSampleLoader extends SampleLoader {
-    
+
     // marker string to mark the last sample row, which has total number or rows
     // seen by this map instance
     // this string will be in the 2nd last column of the last sample row
     // it is used by GetMemNumRows
-    public static final String NUMROWS_TUPLE_MARKER = 
+    public static final String NUMROWS_TUPLE_MARKER =
         "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
-    
+
     //num of rows sampled so far
     private int numRowsSampled = 0;
-    
+
     //average size of tuple in memory, for tuples sampled
     private long avgTupleMemSz = 0;
-    
-    //current row number 
+
+    //current row number
     private long rowNum = 0;
-    
+
     // number of tuples to skip after each sample
-    long skipInterval = -1;
+    private long skipInterval = -1;
 
-    // bytes in input to skip after every sample. 
-    // divide this by avgTupleMemSize to get skipInterval 
+    // bytes in input to skip after every sample.
+    // divide this by avgTupleMemSize to get skipInterval
     private long memToSkipPerSample = 0;
-    
+
     // has the special row with row number information been returned
     private boolean numRowSplTupleReturned = false;
-    
-    /// For a given mean and a confidence, a sample rate is obtained from a poisson cdf
-    private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
-    
+
     // 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
     // set to 10 (emperically, minimum number of samples) and the confidence set to 95%
     private static final int DEFAULT_SAMPLE_RATE = 17;
-        
+
     private int sampleRate = DEFAULT_SAMPLE_RATE;
-    
-    /// % of memory available for the input data. This is currenty equal to the memory available
-    /// for the skewed join
-    private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
 
     private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-    
+
     // new Sample tuple
     private Tuple newSample = null;
-        
-//  private final Log log = LogFactory.getLog(getClass());
-    
+
     public PoissonSampleLoader(String funcSpec, String ns) {
         super(funcSpec);
         super.setNumSamples(Integer.valueOf(ns)); // will be overridden
@@ -91,18 +77,18 @@ public class PoissonSampleLoader extends
     @Override
     public Tuple getNext() throws IOException {
         if(numRowSplTupleReturned){
-            // row num special row has been returned after all inputs 
-            // were read, nothing more to read 
+            // row num special row has been returned after all inputs
+            // were read, nothing more to read
             return null;
         }
 
-
         if(skipInterval == -1){
             //select first tuple as sample and calculate
-            // number of tuples to be skipped 
+            // number of tuples to be skipped
             Tuple t = loader.getNext();
-            if(t == null)
+            if(t == null) {
                 return createNumRowTuple(null);
+            }
             long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
             memToSkipPerSample = availRedMem/sampleRate;
             updateSkipInterval(t);
@@ -121,8 +107,9 @@ public class PoissonSampleLoader extends
 
         // skipped enough, get new sample
         Tuple t = loader.getNext();
-        if(t == null)
+        if(t == null) {
             return createNumRowTuple(newSample);
+        }
         updateSkipInterval(t);
         rowNum++;
         Tuple currentSample = newSample;
@@ -136,14 +123,14 @@ public class PoissonSampleLoader extends
      * @param t - tuple
      */
     private void updateSkipInterval(Tuple t) {
-        avgTupleMemSz = 
+        avgTupleMemSz =
             ((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
         skipInterval = memToSkipPerSample/avgTupleMemSz;
-        
-            // skipping fewer number of rows the first few times, to reduce 
-            // the probability of first tuples size (if much smaller than rest) 
-        // resulting in 
-            // very few samples being sampled. Sampling a little extra is OK
+
+        // skipping fewer number of rows the first few times, to reduce the
+        // probability of first tuples size (if much smaller than rest)
+        // resulting in very few samples being sampled. Sampling a little extra
+        // is OK
         if(numRowsSampled < 5)
             skipInterval = skipInterval/(10-numRowsSampled);
         ++numRowsSampled;
@@ -157,21 +144,22 @@ public class PoissonSampleLoader extends
      */
     private Tuple createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
-        TupleFactory factory = TupleFactory.getInstance();       
+        TupleFactory factory = TupleFactory.getInstance();
         Tuple t = factory.newTuple(sz + 2);
- 
+
         if (sample != null) {
             for(int i=0; i<sample.size(); i++){
                 t.set(i, sample.get(i));
             }
         }
-        
+
         t.set(sz, NUMROWS_TUPLE_MARKER);
         t.set(sz + 1, rowNum);
         numRowSplTupleReturned = true;
         return t;
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
         super.prepareToRead(reader, split);
@@ -184,8 +172,9 @@ public class PoissonSampleLoader extends
         newSample = null;
 
         Configuration conf = split.getConf();
-        sampleRate = conf.getInt(SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
-        heapPerc = conf.getFloat(PERC_MEM_AVAIL, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+        sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+        heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL,
+                PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
     }
 
 }

Modified: pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Fri May 30 19:07:23 2014
@@ -55,7 +55,15 @@ public class NullablePartitionWritable e
 		return partitionIndex;
 	}
 
-  	@Override
+    @Override
+    public NullablePartitionWritable clone() throws CloneNotSupportedException {
+        NullablePartitionWritable clone = new NullablePartitionWritable();
+        clone.setKey(this.getKey());
+        clone.partitionIndex = this.partitionIndex;
+        return clone;
+    }
+
+    @Override
     public int compareTo(Object o) {
 		return key.compareTo(((NullablePartitionWritable)o).getKey());
 	}

Modified: pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java Fri May 30 19:07:23 2014
@@ -42,6 +42,12 @@ public class NullableTuple extends PigNu
         mValue = t;
     }
 
+    public NullableTuple(NullableTuple copy) {
+        setNull(copy.isNull());
+        mValue = copy.mValue;
+        setIndex(copy.getIndex());
+    }
+
     @Override
     public Object getValueAsPigType() {
         return isNull() ? null : (Tuple)mValue;

Modified: pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Fri May 30 19:07:23 2014
@@ -40,7 +40,7 @@ import org.apache.pig.data.Tuple;
 //Put in to make the compiler not complain about WritableComparable
 //being a generic type.
 @SuppressWarnings("unchecked")
-public abstract class PigNullableWritable implements WritableComparable {
+public abstract class PigNullableWritable implements WritableComparable, Cloneable {
 
     /**
      * indices in multiquery optimized maps
@@ -61,6 +61,19 @@ public abstract class PigNullableWritabl
 
     private byte mIndex;
 
+    @Override
+    public PigNullableWritable clone() throws CloneNotSupportedException {
+        try {
+            PigNullableWritable clone = this.getClass().newInstance();
+            clone.mNull = this.mNull;
+            clone.mValue = this.mValue;
+            clone.mIndex = this.mIndex;
+            return clone;
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while cloning " + this, e);
+        }
+    }
+
     /**
      * Compare two nullable objects.  Step one is to check if either or both
      * are null.  If one is null and the other is not, then the one that is

Modified: pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java Fri May 30 19:07:23 2014
@@ -18,37 +18,45 @@
 
 package org.apache.pig.impl.plan;
 
-import java.util.Map;
 import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
 
 public class NodeIdGenerator {
 
     private Map<String, Long> scopeToIdMap;
     private static NodeIdGenerator theGenerator = new NodeIdGenerator();
-    
+
     private NodeIdGenerator() {
         scopeToIdMap = new HashMap<String, Long>();
     }
-    
+
     public static NodeIdGenerator getGenerator() {
         return theGenerator;
     }
-    
+
     public long getNextNodeId(String scope) {
         Long val = scopeToIdMap.get(scope);
-        
+
         long nextId = 0;
-        
+
         if (val != null) {
             nextId = val.longValue();
         }
 
         scopeToIdMap.put(scope, nextId + 1);
-        
+
         return nextId;
     }
 
+    @VisibleForTesting
     public static void reset(String scope) {
         theGenerator.scopeToIdMap.put(scope, 0L) ;
     }
+
+    @VisibleForTesting
+    public static void reset() {
+        theGenerator.scopeToIdMap.clear();
+    }
 }

Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Fri May 30 19:07:23 2014
@@ -282,6 +282,32 @@ public abstract class OperatorPlan<E ext
             }
         }
     }
+    
+    /**
+     * Move everything below a given operator to the new operator plan.  The specified operator will
+     * be moved and will be the root of the new operator plan
+     * @param root Operator to move everything after
+     * @param newPlan new operator plan to move things into
+     * @throws PlanException 
+     */
+    public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
+        newPlan.add(root);
+        if (getSuccessors(root) == null) {
+            remove(root);
+            return;
+        }
+        
+        List<E> succs = new ArrayList<E>();
+        succs.addAll(getSuccessors(root));
+        
+        for (E succ : succs) {
+            moveTree(succ, newPlan);
+        }
+        remove(root);
+        for (E succ : succs) {
+            newPlan.connect(root, succ);
+        }
+    }
 
     /**
      * Trim everything above a given operator.  The specified operator will

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Fri May 30 19:07:23 2014
@@ -128,6 +128,43 @@ public class JarManager {
             return pkgClass;
         }
     }
+    
+    public static void createBootStrapJar(OutputStream os, PigContext pigContext) throws IOException {
+        JarOutputStream jarFile = new JarOutputStream(os);
+        HashMap<String, String> contents = new HashMap<String, String>();
+        Vector<JarListEntry> jarList = new Vector<JarListEntry>();
+
+        for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+            addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
+        }
+
+        Iterator<JarListEntry> it = jarList.iterator();
+        while (it.hasNext()) {
+            JarListEntry jarEntry = it.next();
+            mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
+        }
+
+        // Just like in MR Pig, we'll add the script files to Job.jar. For Jython, MR Pig packages
+        // all the dependencies in Job.jar. For JRuby, we need the resource pigudf.rb, which is in
+        // the pig jar. JavaScript files could be packaged either in the Job.jar or as Tez local
+        // resources; MR Pig adds them to the Job.jar so that's what we will do also. Groovy files
+        // must be added as Tez local resources in the TezPlanContainer (in MR Pig Groovy UDF's
+        // are actually broken since they cannot be found by the GroovyScriptEngine).
+        for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
+            InputStream stream = null;
+            if (entry.getValue().exists()) {
+                stream = new FileInputStream(entry.getValue());
+            } else {
+                stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
+            }
+            if (stream == null) {
+                throw new IOException("Cannot find " + entry.getValue().getPath());
+            }
+            addStream(jarFile, entry.getKey(), stream, contents);
+        }
+
+        jarFile.close();
+    }
 
     /**
      * Create a jarfile in a temporary path, that is a merge of all the jarfiles containing the

Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Fri May 30 19:07:23 2014
@@ -26,20 +26,20 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 
 public class UDFContext {
-    
+
     private Configuration jconf = null;
     private HashMap<UDFContextKey, Properties> udfConfs;
     private Properties clientSysProps;
     private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
-    private static final String UDF_CONTEXT = "pig.udf.context"; 
-    
-    private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {                                                                                                                         
-        @Override                                                                                                                                                                                        
-        public UDFContext initialValue() {                                                                                                                                                               
-            return new UDFContext();                                                                                                                                                                     
-        }                                                                                                                                                                                                
-    };           
-    
+    private static final String UDF_CONTEXT = "pig.udf.context";
+
+    private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {
+        @Override
+        public UDFContext initialValue() {
+            return new UDFContext();
+        }
+    };
+
     private UDFContext() {
         udfConfs = new HashMap<UDFContextKey, Properties>();
     }
@@ -61,10 +61,22 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
+    public static void destroy() {
+        tss = new ThreadLocal<UDFContext>() {
+            @Override
+            public UDFContext initialValue() {
+                return new UDFContext();
+            }
+        };
+    }
+
+    /*
+     *  internal pig use only - should NOT be called from user code
+     */
     public void setClientSystemProps(Properties properties) {
         clientSysProps = properties;
     }
-    
+
     /**
      * Get the System Properties (Read only) as on the client machine from where Pig
      * was launched. This will include command line properties passed at launch
@@ -75,8 +87,8 @@ public class UDFContext {
         return clientSysProps;
     }
     /**
-     * Adds the JobConf to this singleton.  Will be 
-     * called on the backend by the Map and Reduce 
+     * Adds the JobConf to this singleton.  Will be
+     * called on the backend by the Map and Reduce
      * functions so that UDFs can obtain the JobConf
      * on the backend.
      */
@@ -99,7 +111,7 @@ public class UDFContext {
 
     /**
      * Get a properties object that is specific to this UDF.
-     * Note that if a given UDF is called multiple times in a script, 
+     * Note that if a given UDF is called multiple times in a script,
      * and each instance passes different arguments, then each will
      * be provided with different configuration object.
      * This can be used by loaders to pass their input object path
@@ -117,11 +129,11 @@ public class UDFContext {
      * the UDF unique.
      * @return A reference to the properties object specific to
      * the calling UDF.  This is a reference, not a copy.
-     * Any changes to this object will automatically be 
-     * propogated to other instances of the UDF calling this 
+     * Any changes to this object will automatically be
+     * propogated to other instances of the UDF calling this
      * function.
      */
-    
+
     @SuppressWarnings("rawtypes")
     public Properties getUDFProperties(Class c, String[] args) {
         UDFContextKey k = generateKey(c, args);
@@ -135,7 +147,7 @@ public class UDFContext {
 
     /**
      * Get a properties object that is specific to this UDF.
-     * Note that if a given UDF is called multiple times in a script, 
+     * Note that if a given UDF is called multiple times in a script,
      * they will all be provided the same configuration object.  It
      * is up to the UDF to make sure the multiple instances do not
      * stomp on each other.
@@ -151,8 +163,8 @@ public class UDFContext {
      * @param c of the UDF obtaining the properties object.
      * @return A reference to the properties object specific to
      * the calling UDF.  This is a reference, not a copy.
-     * Any changes to this object will automatically be 
-     * propogated to other instances of the UDF calling this 
+     * Any changes to this object will automatically be
+     * propogated to other instances of the UDF calling this
      * function.
      */
     @SuppressWarnings("rawtypes")
@@ -165,7 +177,7 @@ public class UDFContext {
         }
         return p;
     }
-    
+
 
 
     /**
@@ -180,7 +192,7 @@ public class UDFContext {
         conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
         conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }
-    
+
     /**
      * Populate the udfConfs field.  This function is intended to
      * be called by Map.configure or Reduce.configure on the backend.
@@ -188,20 +200,20 @@ public class UDFContext {
      * @throws IOException if underlying deseralization throws it
      */
     @SuppressWarnings("unchecked")
-    public void deserialize() throws IOException {  
+    public void deserialize() throws IOException {
         udfConfs = (HashMap<UDFContextKey, Properties>)ObjectSerializer.deserialize(jconf.get(UDF_CONTEXT));
         clientSysProps = (Properties)ObjectSerializer.deserialize(
                 jconf.get(CLIENT_SYS_PROPS));
     }
-    
+
     private UDFContextKey generateKey(Class<?> c, String[] args) {
         return new UDFContextKey(c.getName(), args);
     }
-    
+
     public void reset() {
         udfConfs.clear();
     }
-    
+
     public boolean isUDFConfEmpty() {
         return udfConfs.isEmpty();
     }
@@ -217,10 +229,11 @@ public class UDFContext {
                 || (jconf.get("mapred.task.id") == null &&
                     jconf.get("mapreduce.job.application.attempt.id") == null));
     }
-    
+
     /**
      * Make a shallow copy of the context.
      */
+    @Override
     public UDFContext clone() {
     	UDFContext other = new UDFContext();
     	other.clientSysProps = this.clientSysProps;
@@ -228,10 +241,10 @@ public class UDFContext {
     	other.udfConfs = this.udfConfs;
     	return other;
     }
-    
+
     /**
-     * Class that acts as key for hashmap in UDFContext, 
-     *  it holds the class and args of the udf, and 
+     * Class that acts as key for hashmap in UDFContext,
+     *  it holds the class and args of the udf, and
      *  implements equals() and hashCode()
      */
     private static class UDFContextKey implements Serializable{
@@ -239,13 +252,13 @@ public class UDFContext {
         private static final long serialVersionUID = 1;
         private String className;
         private String[] args;
-        
+
         UDFContextKey(){
         }
 
         UDFContextKey(String className, String [] args){
             this.className = className;
-            this.args = args;        
+            this.args = args;
         }
 
         /* (non-Javadoc)

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Fri May 30 19:07:23 2014
@@ -23,8 +23,10 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.SequenceInputStream;
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
@@ -39,8 +41,11 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
@@ -57,6 +62,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
@@ -305,7 +311,7 @@ public class Utils {
                 return false;
             }
         }
-        
+
         public String supportedCodecsToString() {
             StringBuffer sb = new StringBuffer();
             boolean first = true;
@@ -313,7 +319,7 @@ public class Utils {
                 if(first) {
                     first = false;
                 } else {
-                    sb.append(",");    
+                    sb.append(",");
                 }
                 sb.append(codec.name());
             }
@@ -520,6 +526,54 @@ public class Utils {
         }
     }
 
+    /**
+     * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache
+     * @param pigContext the pigContext
+     * @param conf the job conf
+     * @param url the url to be added to distributed cache
+     * @return the path as seen on distributed cache
+     * @throws IOException
+     */
+    @SuppressWarnings("deprecation")
+    public static void putJarOnClassPathThroughDistributedCache(PigContext pigContext,
+            Configuration conf, URL url) throws IOException {
+        // Turn on the symlink feature
+        DistributedCache.createSymlink(conf);
+
+        // REGISTER always copies locally the jar file. see PigServer.registerJar()
+        Path pathInHDFS = Utils.shipToHDFS(pigContext, conf, url);
+        // and add to the DistributedCache
+        DistributedCache.addFileToClassPath(pathInHDFS, conf);
+        pigContext.skipJars.add(url.getPath());
+    }
+
+    /**
+     * copy the file to hdfs in a temporary path
+     * @param pigContext the pig context
+     * @param conf the job conf
+     * @param url the url to ship to hdfs
+     * @return the location where it was shipped
+     * @throws IOException
+     */
+    public static Path shipToHDFS(PigContext pigContext, Configuration conf, URL url)
+            throws IOException {
+        String path = url.getPath();
+        int slash = path.lastIndexOf("/");
+        String suffix = slash == -1 ? path : path.substring(slash+1);
+
+        Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);
+        FileSystem fs = dst.getFileSystem(conf);
+        OutputStream os = fs.create(dst);
+        try {
+            IOUtils.copyBytes(url.openStream(), os, 4096, true);
+        } finally {
+            // IOUtils can not close both the input and the output properly in a finally
+            // as we can get an exception in between opening the stream and calling the method
+            os.close();
+        }
+        return dst;
+    }
+
     public static String getStackStraceStr(Throwable e) {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);