You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/23 02:06:22 UTC

svn commit: r1560565 [2/3] - in /pig/branches/tez: shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/optimizer/ src/org/apach...

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnChainInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SortKeyInfo;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerUtil {
+    private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
+
+    private SecondaryKeyOptimizerUtil() {
+
+    }
+
+    private static class POToChange {
+        POToChange(PhysicalOperator oper, PhysicalPlan plan, POForEach forEach) {
+            this.oper = oper;
+            this.plan = plan;
+            this.forEach = forEach;
+        }
+
+        PhysicalOperator oper;
+
+        PhysicalPlan plan;
+
+        POForEach forEach;
+    }
+
+    public static class SecondaryKeyOptimizerInfo {
+
+        private int numSortRemoved = 0;
+        private int numDistinctChanged = 0;
+        private int numUseSecondaryKey = 0;
+        private boolean useSecondaryKey = false;
+        private boolean[] secondarySortOrder;
+
+        public int getNumSortRemoved() {
+            return numSortRemoved;
+        }
+
+        public void incrementNumSortRemoved() {
+            this.numSortRemoved++;
+        }
+
+        public int getNumDistinctChanged() {
+            return numDistinctChanged;
+        }
+
+        public void incrementNumDistinctChanged() {
+            this.numDistinctChanged++;
+        }
+
+        public int getNumUseSecondaryKey() {
+            return numUseSecondaryKey;
+        }
+
+        public void incrementNumUseSecondaryKey() {
+            this.numUseSecondaryKey++;
+        }
+
+        public boolean isUseSecondaryKey() {
+            return useSecondaryKey;
+        }
+
+        public void setUseSecondaryKey(boolean useSecondaryKey) {
+            this.useSecondaryKey = useSecondaryKey;
+        }
+
+        public boolean[] getSecondarySortOrder() {
+            return secondarySortOrder;
+        }
+
+        public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+            this.secondarySortOrder = secondarySortOrder;
+        }
+
+    }
+
+    // Build sort key structure from POLocalRearrange
+    private static SortKeyInfo getSortKeyInfo(POLocalRearrange rearrange) throws ExecException {
+        SortKeyInfo result = new SortKeyInfo();
+        List<PhysicalPlan> plans = rearrange.getPlans();
+        nextPlan: for (int i = 0; i < plans.size(); i++) {
+            PhysicalPlan plan = plans.get(i);
+            ColumnChainInfo columnChainInfo = new ColumnChainInfo();
+            if (plan.getRoots() == null) {
+                log.debug("POLocalRearrange plan is null");
+                return null;
+            } else if (plan.getRoots().size() != 1) {
+                // POLocalRearrange plan contains more than 1 root.
+                // Probably there is an Expression operator in the local
+                // rearrangement plan, skip this plan
+                continue nextPlan;
+            } else {
+                List<Integer> columns = new ArrayList<Integer>();
+                columns
+                        .add(rearrange.getIndex()
+                                & PigNullableWritable.idxSpace);
+
+                // The first item inside columnChainInfo is set to type Tuple.
+                // This value is not actually in use, but it intends to match
+                // the type of POProject in reduce side
+                columnChainInfo.insert(columns, DataType.TUPLE);
+
+                PhysicalOperator node = plan.getRoots().get(0);
+                while (node != null) {
+                    if (node instanceof POProject) {
+                        POProject project = (POProject) node;
+                        if(project.isProjectToEnd()){
+                            columnChainInfo.insert(project.getStartCol(),
+                                    project.getResultType());
+                        }else {
+                            columnChainInfo.insert(
+                                    project.getColumns(), project.getResultType());
+                        }
+
+                        if (plan.getSuccessors(node) == null)
+                            node = null;
+                        else if (plan.getSuccessors(node).size() != 1) {
+                            log.debug(node + " have more than 1 successor");
+                            node = null;
+                        } else
+                            node = plan.getSuccessors(node).get(0);
+                    } else
+                        // constant, UDF, we will pass
+                        continue nextPlan;
+                }
+            }
+            // Let's assume all main key is sorted ascendant, we can further
+            // optimize it to match one of the nested sort/distinct key, because we do not
+            // really care about how cogroup key are sorted; But it may not be the case
+            // if sometime we switch all the comparator to byte comparator, so just
+            // leave it as it is for now
+            result.insertColumnChainInfo(i, columnChainInfo, true);
+        }
+        return result;
+    }
+
+    public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+        log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
+        SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo();
+        List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
+        SortKeyInfo secondarySortKeyInfo = null;
+        List<POToChange> sortsToRemove = null;
+        List<POToChange> distinctsToChange = null;
+
+
+        List<PhysicalOperator> mapLeaves = mapPlan.getLeaves();
+        if (mapLeaves == null || mapLeaves.size() != 1) {
+            log.debug("Expected map to have single leaf! Skip secondary key optimizing");
+            return null;
+        }
+        PhysicalOperator mapLeaf = mapLeaves.get(0);
+
+        // Figure out the main key of the map-reduce job from POLocalRearrange
+        try {
+            if (mapLeaf instanceof POLocalRearrange) {
+                SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
+                if (sortKeyInfo == null) {
+                    log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+                    return null;
+                }
+                sortKeyInfos.add(sortKeyInfo);
+            } else if (mapLeaf instanceof POUnion) {
+                List<PhysicalOperator> preds = mapPlan
+                        .getPredecessors(mapLeaf);
+                for (PhysicalOperator pred : preds) {
+                    if (pred instanceof POLocalRearrange) {
+                        SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) pred);
+                        if (sortKeyInfo == null) {
+                            log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+                            return null;
+                        }
+                        sortKeyInfos.add(sortKeyInfo);
+                    }
+                }
+            } else {
+                log.debug("Cannot find POLocalRearrange or POUnion in map leaf, skip secondary key optimizing");
+                return null;
+            }
+        } catch (ExecException e) {
+            log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+            return null;
+        }
+
+        if (reducePlan.isEmpty()) {
+            log.debug("Reduce plan is empty, skip secondary key optimizing");
+            return null;
+        }
+
+        List<PhysicalOperator> reduceRoots = reducePlan.getRoots();
+        if (reduceRoots.size() != 1) {
+            log.debug("Expected reduce to have single root, skip secondary key optimizing");
+            return null;
+        }
+
+        PhysicalOperator root = reduceRoots.get(0);
+        if (!(root instanceof POPackage)) {
+            log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+            return null;
+        }
+
+        // visit the POForEach of the reduce plan. We can have Limit and Filter
+        // in the middle
+        PhysicalOperator currentNode = root;
+        POForEach foreach = null;
+        while (currentNode != null) {
+            if (currentNode instanceof POPackage
+                    && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
+                    || currentNode instanceof POFilter
+                    || currentNode instanceof POLimit) {
+                List<PhysicalOperator> succs = reducePlan
+                        .getSuccessors(currentNode);
+                if (succs == null) // We didn't find POForEach
+                    return null;
+                if (succs.size() != 1) {
+                    log.debug("See multiple output for " + currentNode
+                            + " in reduce plan, skip secondary key optimizing");
+                    return null;
+                }
+                currentNode = succs.get(0);
+            } else if (currentNode instanceof POForEach) {
+                foreach = (POForEach) currentNode;
+                break;
+            } else { // Skip optimization
+                return null;
+            }
+        }
+
+        // We do not find a foreach (we shall not come here, a trick to fool findbugs)
+        if (foreach==null)
+            return null;
+
+        sortsToRemove = new ArrayList<POToChange>();
+        distinctsToChange = new ArrayList<POToChange>();
+
+        for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
+            // visit inner plans to figure out the sort order for distinct /
+            // sort
+            SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
+                    innerPlan, sortKeyInfos, secondarySortKeyInfo);
+            try {
+                innerPlanDiscover.process();
+            } catch (FrontendException e) {
+                int errorCode = 2213;
+                throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+            }
+            secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+            if (innerPlanDiscover.getSortsToRemove() != null) {
+                for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
+                    sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
+                }
+            }
+            if (innerPlanDiscover.getDistinctsToChange() != null) {
+                for (PODistinct distinct : innerPlanDiscover
+                        .getDistinctsToChange()) {
+                    distinctsToChange.add(new POToChange(distinct, innerPlan,
+                            foreach));
+                }
+            }
+        }
+
+        try {
+            // Change PODistinct to use POSortedDistinct, which assume the input
+            // data is sorted
+            for (POToChange distinctToChange : distinctsToChange) {
+                secKeyOptimizerInfo.incrementNumDistinctChanged();
+                PODistinct oldDistinct = (PODistinct) distinctToChange.oper;
+                String scope = oldDistinct.getOperatorKey().scope;
+                POSortedDistinct newDistinct = new POSortedDistinct(
+                        new OperatorKey(scope, NodeIdGenerator.getGenerator()
+                                .getNextNodeId(scope)), oldDistinct
+                                .getRequestedParallelism(), oldDistinct
+                                .getInputs());
+                newDistinct.setInputs(oldDistinct.getInputs());
+                newDistinct.setResultType(oldDistinct.getResultType());
+                distinctToChange.plan.replace(oldDistinct, newDistinct);
+                distinctToChange.forEach.getLeaves();
+            }
+            // Removed POSort, if the successor require a databag, we need to
+            // add a PORelationToExprProject
+            // to convert tuples into databag
+            for (POToChange sortToRemove : sortsToRemove) {
+                secKeyOptimizerInfo.incrementNumSortRemoved();
+                POSort oldSort = (POSort) sortToRemove.oper;
+                String scope = oldSort.getOperatorKey().scope;
+                List<PhysicalOperator> preds = sortToRemove.plan
+                        .getPredecessors(sortToRemove.oper);
+                List<PhysicalOperator> succs = sortToRemove.plan
+                .getSuccessors(sortToRemove.oper);
+                POProject project = null;
+                if ((preds == null
+                        || preds.get(0).getResultType() != DataType.BAG
+                        && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+                        && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+                {
+                    project = new PORelationToExprProject(new OperatorKey(
+                            scope, NodeIdGenerator.getGenerator()
+                                    .getNextNodeId(scope)), oldSort
+                            .getRequestedParallelism());
+                    project.setInputs(oldSort.getInputs());
+                    project.setResultType(DataType.BAG);
+                    project.setStar(true);
+                }
+                if (project == null)
+                    sortToRemove.plan.removeAndReconnect(sortToRemove.oper);
+                else
+                    sortToRemove.plan.replace(oldSort, project);
+                sortToRemove.forEach.getLeaves();
+            }
+        } catch (PlanException e) {
+            int errorCode = 2202;
+            throw new VisitorException(
+                    "Error change distinct/sort to use secondary key optimizer",
+                    errorCode, e);
+        }
+        if (secondarySortKeyInfo != null) {
+            // Adjust POLocalRearrange, POPackage, MapReduceOper to use the
+            // secondary key
+            secKeyOptimizerInfo.incrementNumUseSecondaryKey();
+            secKeyOptimizerInfo.setUseSecondaryKey(true);
+            secKeyOptimizerInfo.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+            int indexOfRearrangeToChange = -1;
+            for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+                    .getColumnChains()) {
+                ColumnInfo currentColumn = columnChainInfo.getColumnInfos()
+                        .get(0);
+                int index = currentColumn.getColumns().get(0);
+                if (indexOfRearrangeToChange == -1)
+                    indexOfRearrangeToChange = index;
+                else if (indexOfRearrangeToChange != index) {
+                    int errorCode = 2203;
+                    throw new VisitorException("Sort on columns from different inputs.", errorCode);
+                }
+            }
+            if (mapLeaf instanceof POLocalRearrange) {
+                ((POLocalRearrange) mapLeaf).setUseSecondaryKey(true);
+                setSecondaryPlan(mapPlan, (POLocalRearrange) mapLeaf,
+                        secondarySortKeyInfo);
+            } else if (mapLeaf instanceof POUnion) {
+                List<PhysicalOperator> preds = mapPlan
+                        .getPredecessors(mapLeaf);
+                boolean found = false;
+                for (PhysicalOperator pred : preds) {
+                    POLocalRearrange rearrange = (POLocalRearrange) pred;
+                    rearrange.setUseSecondaryKey(true);
+                    if (rearrange.getIndex() == indexOfRearrangeToChange) {
+                        // Try to find the POLocalRearrange for the secondary key
+                        found = true;
+                        setSecondaryPlan(mapPlan, rearrange, secondarySortKeyInfo);
+                    }
+                }
+                if (!found)
+                {
+                    int errorCode = 2214;
+                    throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
+                }
+            }
+            POPackage pack = (POPackage) root;
+            pack.getPkgr().setUseSecondaryKey(true);
+        }
+        return secKeyOptimizerInfo;
+    }
+
+    private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange,
+            SortKeyInfo secondarySortKeyInfo) throws VisitorException {
+        // Put plan to project secondary key to the POLocalRearrange
+        try {
+            String scope = rearrange.getOperatorKey().scope;
+            List<PhysicalPlan> secondaryPlanList = new ArrayList<PhysicalPlan>();
+            for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+                    .getColumnChains()) {
+                PhysicalPlan secondaryPlan = new PhysicalPlan();
+                for (int i = 1; i < columnChainInfo.size(); i++) {
+                    // The first item in columnChainInfo indicate the index of
+                    // input, we have addressed
+                    // already before we come here
+                    ColumnInfo columnInfo = columnChainInfo.getColumnInfo(i);
+                    POProject project = new POProject(
+                            new OperatorKey(scope, NodeIdGenerator
+                                    .getGenerator().getNextNodeId(scope)),
+                            rearrange.getRequestedParallelism());
+                    if(columnInfo.isRangeProject())
+                        project.setProjectToEnd(columnInfo.getStartCol());
+                    else
+                        project
+                                .setColumns((ArrayList<Integer>) columnInfo.getColumns());
+                    project.setResultType(columnInfo.getResultType());
+                    secondaryPlan.addAsLeaf(project);
+                }
+                if (secondaryPlan.isEmpty()) { // If secondary key sort on the
+                                               // input as a whole
+                    POProject project = new POProject(
+                            new OperatorKey(scope, NodeIdGenerator
+                                    .getGenerator().getNextNodeId(scope)),
+                            rearrange.getRequestedParallelism());
+                    project.setStar(true);
+                    secondaryPlan.addAsLeaf(project);
+                }
+                secondaryPlanList.add(secondaryPlan);
+            }
+            rearrange.setSecondaryPlans(secondaryPlanList);
+        } catch (PlanException e) {
+            int errorCode = 2204;
+            throw new VisitorException("Error setting secondary key plan",
+                    errorCode, e);
+        }
+    }
+
+    // Find eligible sort and distinct physical operators from the reduce plan.
+    // SecondaryKeyChecker will check for sort/distinct keys (for distinct, it
+    // is
+    // always the entire bag), if it is the same with the main key, then we can
+    // just
+    // remove it. If it is not, then we can have 1 secondary sort key, put the
+    // first
+    // such sort/distinct key as the secondary sort key. For subsequent
+    // sort/distinct,
+    // we cannot do any secondary key optimization because we only have 1
+    // secondary
+    // sort key.
+    private static class SecondaryKeyDiscover {
+        PhysicalPlan mPlan;
+
+        List<POSort> sortsToRemove = new ArrayList<POSort>();
+
+        List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
+
+        List<SortKeyInfo> sortKeyInfos;
+
+        SortKeyInfo secondarySortKeyInfo;
+
+        ColumnChainInfo columnChainInfo = null;
+
+        // PhysicalPlan here is foreach inner plan
+        SecondaryKeyDiscover(PhysicalPlan plan,
+                List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
+            this.mPlan = plan;
+            this.sortKeyInfos = sortKeyInfos;
+            this.secondarySortKeyInfo = secondarySortKeyInfo;
+        }
+
+        public void process() throws FrontendException
+        {
+            List<PhysicalOperator> roots = mPlan.getRoots();
+            for (PhysicalOperator root : roots) {
+                columnChainInfo = new ColumnChainInfo();
+                processRoot(root);
+            }
+        }
+
+        public void processRoot(PhysicalOperator root) throws FrontendException {
+            PhysicalOperator currentNode = root;
+            while (currentNode!=null) {
+                boolean sawInvalidPhysicalOper = false;
+                if (currentNode instanceof PODistinct)
+                    sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+                else if (currentNode instanceof POSort)
+                    sawInvalidPhysicalOper = processSort((POSort)currentNode);
+                else if (currentNode instanceof POProject)
+                    sawInvalidPhysicalOper = processProject((POProject)currentNode);
+                else if (currentNode instanceof POUserFunc ||
+                         currentNode instanceof POUnion ||
+                         // We don't process foreach, since foreach is too complex to get right
+                         currentNode instanceof POForEach)
+                    break;
+
+                if (sawInvalidPhysicalOper)
+                    break;
+
+                List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+                if (succs==null)
+                    currentNode = null;
+                else {
+                    if (succs.size()>1) {
+                        int errorCode = 2215;
+                        throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+                                errorCode);
+                    }
+                    currentNode = succs.get(0);
+                }
+            }
+        }
+
+        // We see PODistinct, check which key it is using
+        public boolean processDistinct(PODistinct distinct) throws FrontendException {
+            SortKeyInfo keyInfos = new SortKeyInfo();
+            try {
+                keyInfos.insertColumnChainInfo(0,
+                        (ColumnChainInfo) columnChainInfo.clone(), true);
+            } catch (CloneNotSupportedException e) { // We implement Clonable,
+                                                     // impossible to get here
+            }
+
+            // if it is part of main key
+            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+                if (sortKeyInfo.moreSpecificThan(keyInfos)) {
+                    distinctsToChange.add(distinct);
+                    return false;
+                }
+            }
+
+            // if it is part of secondary key
+            if (secondarySortKeyInfo != null
+                    && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
+                distinctsToChange.add(distinct);
+                return false;
+            }
+
+            // Now set the secondary key
+            if (secondarySortKeyInfo == null) {
+                distinctsToChange.add(distinct);
+                secondarySortKeyInfo = keyInfos;
+            }
+            return false;
+        }
+
+        // Accumulate column info
+        public boolean processProject(POProject project) throws FrontendException {
+            columnChainInfo.insertInReduce(project);
+            return false;
+        }
+
+        // We see POSort, check which key it is using
+        public boolean processSort(POSort sort) throws FrontendException{
+            SortKeyInfo keyInfo = new SortKeyInfo();
+            for (int i = 0; i < sort.getSortPlans().size(); i++) {
+                PhysicalPlan sortPlan = sort.getSortPlans().get(i);
+                ColumnChainInfo sortChainInfo = null;
+                try {
+                    sortChainInfo = (ColumnChainInfo) columnChainInfo.clone();
+                } catch (CloneNotSupportedException e) { // We implement
+                                                         // Clonable, impossible
+                                                         // to get here
+                }
+                boolean r = false;
+                try {
+                    r = collectColumnChain(sortPlan, sortChainInfo);
+                } catch (PlanException e) {
+                    int errorCode = 2206;
+                    throw new FrontendException("Error visiting POSort inner plan",
+                            errorCode, e);
+                }
+                if (r==true) // if we saw physical operator other than project in sort plan
+                {
+                    return true;
+                }
+                keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
+                        .getMAscCols().get(i));
+            }
+            // if it is part of main key
+            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+                if (sortKeyInfo.moreSpecificThan(keyInfo)) {
+                    sortsToRemove.add(sort);
+                    return false;
+                }
+            }
+            // if it is part of secondary key
+            if (secondarySortKeyInfo != null
+                    && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
+                sortsToRemove.add(sort);
+                return false;
+            }
+
+            // Now set the secondary key
+            if (secondarySortKeyInfo == null) {
+                sortsToRemove.add(sort);
+                secondarySortKeyInfo = keyInfo;
+            }
+            return false;
+        }
+
+        public List<POSort> getSortsToRemove() {
+            return sortsToRemove;
+        }
+
+        public List<PODistinct> getDistinctsToChange() {
+            return distinctsToChange;
+        }
+
+        public SortKeyInfo getSecondarySortKeyInfo() {
+            return secondarySortKeyInfo;
+        }
+    }
+
+    // Return true if we saw physical operators other than project in the plan
+    static private boolean collectColumnChain(PhysicalPlan plan,
+            ColumnChainInfo columnChainInfo) throws PlanException {
+        if (plan.getRoots().size() != 1) {
+            return true;
+        }
+
+        PhysicalOperator currentNode = plan.getRoots().get(0);
+
+        while (currentNode != null) {
+            if (currentNode instanceof POProject) {
+                POProject project = (POProject) currentNode;
+                columnChainInfo.insertInReduce(project);
+            } else {
+                return true;
+            }
+            List<PhysicalOperator> succs = plan.getSuccessors(currentNode);
+            if (succs == null)
+                break;
+            if (succs.size() != 1) {
+                int errorCode = 2208;
+                throw new PlanException(
+                        "Exception visiting foreach inner plan", errorCode);
+            }
+            currentNode = succs.get(0);
+        }
+        return false;
+    }
+
+}

Modified: pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java Thu Jan 23 01:06:21 2014
@@ -42,6 +42,12 @@ public class NullableTuple extends PigNu
         mValue = t;
     }
 
+    public NullableTuple(NullableTuple copy) {
+        setNull(copy.isNull());
+        mValue = copy.mValue;
+        setIndex(copy.getIndex());
+    }
+
     @Override
     public Object getValueAsPigType() {
         return isNull() ? null : (Tuple)mValue;

Modified: pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java Thu Jan 23 01:06:21 2014
@@ -61,6 +61,14 @@ public abstract class PigNullableWritabl
 
     private byte mIndex;
 
+    public static PigNullableWritable newInstance(PigNullableWritable copy) throws Exception {
+        PigNullableWritable instance = copy.getClass().newInstance();
+        instance.mNull = copy.mNull;
+        instance.mValue = copy.mValue;
+        instance.mIndex = copy.mIndex;
+        return instance;
+    }
+
     /**
      * Compare two nullable objects.  Step one is to check if either or both
      * are null.  If one is null and the other is not, then the one that is

Modified: pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java Thu Jan 23 01:06:21 2014
@@ -17,20 +17,20 @@
  */
 package org.apache.pig.test;
 
-import java.io.*;
+import java.io.IOException;
 import java.util.Properties;
 
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.pig.ExecType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 
 /**
- * This class builds a single instance of itself with the Singleton 
- * design pattern. While building the single instance, it sets up a 
- * mini cluster that actually consists of a mini DFS cluster and a 
- * mini MapReduce cluster on the local machine and also sets up the 
+ * This class builds a single instance of itself with the Singleton
+ * design pattern. While building the single instance, it sets up a
+ * mini cluster that actually consists of a mini DFS cluster and a
+ * mini MapReduce cluster on the local machine and also sets up the
  * environment for Pig to run on top of the mini cluster.
  *
  * This class is the base class for MiniCluster, which has slightly
@@ -45,6 +45,9 @@ abstract public class MiniGenericCluster
     protected static MiniGenericCluster INSTANCE = null;
     protected static boolean isSetup = false;
 
+    public static String EXECTYPE_MR = "mr";
+    public static String EXECTYPE_TEZ = "tez";
+
     /**
      * Returns the single instance of class MiniGenericCluster that represents
      * the resources for a mini dfs cluster and a mini mr (or tez) cluster. The
@@ -58,9 +61,16 @@ abstract public class MiniGenericCluster
                 throw new RuntimeException("test.exec.type is not set");
             }
 
-            if (execType.equalsIgnoreCase("mr")) {
+            return buildCluster(execType);
+        }
+        return INSTANCE;
+    }
+
+    public static MiniGenericCluster buildCluster(String execType) {
+        if (INSTANCE == null) {
+            if (execType.equalsIgnoreCase(EXECTYPE_MR)) {
                 INSTANCE = new MiniCluster();
-            } else if (execType.equalsIgnoreCase("tez")) {
+            } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
                 INSTANCE = new TezMiniCluster();
             } else {
                 throw new RuntimeException("Unknown test.exec.type: " + execType);
@@ -73,22 +83,25 @@ abstract public class MiniGenericCluster
         return INSTANCE;
     }
 
+    abstract protected ExecType getExecType();
+
     abstract protected void setupMiniDfsAndMrClusters();
 
     public void shutDown(){
         INSTANCE.shutdownMiniDfsAndMrClusters();
     }
-    
+
+    @Override
     protected void finalize() {
         shutdownMiniDfsAndMrClusters();
     }
-    
+
     protected void shutdownMiniDfsAndMrClusters() {
         isSetup = false;
         shutdownMiniDfsClusters();
         shutdownMiniMrClusters();
     }
-    
+
     protected void shutdownMiniDfsClusters() {
         try {
             if (m_fileSys != null) { m_fileSys.close(); }
@@ -99,7 +112,7 @@ abstract public class MiniGenericCluster
         m_fileSys = null;
         m_dfs = null;
     }
-    
+
     abstract protected void shutdownMiniMrClusters();
 
     public Properties getProperties() {
@@ -115,12 +128,12 @@ abstract public class MiniGenericCluster
         errorIfNotSetup();
         m_conf.set(name, value);
     }
-    
+
     public FileSystem getFileSystem() {
         errorIfNotSetup();
         return m_fileSys;
     }
-    
+
     /**
      * Throw RunTimeException if isSetup is false
      */

Modified: pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java Thu Jan 23 01:06:21 2014
@@ -37,7 +37,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestAccumulator {
@@ -413,8 +412,6 @@ public class TestAccumulator {
     }
 
     @Test
-    @Ignore
-    // TODO: Enabled the test case after SecondaryKeyOptimization is implemented in Tez
     public void testAccumWithDistinct() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
         pigServer.registerQuery("B = group A by id;");
@@ -429,15 +426,16 @@ public class TestAccumulator {
 
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
+        int count = 0;
         while(iter.hasNext()) {
+            count++;
             Tuple t = iter.next();
             assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
         }
+        assertEquals(4, count);
     }
 
     @Test
-    @Ignore
-    // TODO: Enabled the test case after SecondaryKeyOptimization is implemented in Tez
     public void testAccumWithSort() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
         pigServer.registerQuery("B = foreach A generate id, f, id as t;");
@@ -453,10 +451,13 @@ public class TestAccumulator {
 
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
+        int count = 0;
         while(iter.hasNext()) {
+            count++;
             Tuple t = iter.next();
             assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
         }
+        assertEquals(4, count);
     }
 
     @Test
@@ -683,8 +684,6 @@ public class TestAccumulator {
      * @throws ParseException
      */
     @Test
-    @Ignore
-    // TODO: Enabled the test case after SecondaryKeyOptimization is implemented in Tez
     public void testAccumAfterNestedOp() throws IOException, ParserException{
         // test group by
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");

Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Thu Jan 23 01:06:21 2014
@@ -50,7 +50,6 @@ import org.junit.Test;
 public class TestCombiner {
 
     static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
-    static String execType = System.getProperty("test.exec.type");
 
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
@@ -88,7 +87,7 @@ public class TestCombiner {
                 "c = group a by c2; " +
                 "f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); " +
                 "store f into 'out';";
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext pc = pigServer.getPigContext();
         assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
                 .isEmpty()));
@@ -103,7 +102,7 @@ public class TestCombiner {
                 "f = foreach c generate COUNT(" + dummyUDF + "" +
                 "(org.apache.pig.builtin.Distinct($1.$2)," + dummyUDF + "())); " +
                 "store f into 'out';";
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext pc = pigServer.getPigContext();
         assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
                 .isEmpty()));
@@ -113,7 +112,7 @@ public class TestCombiner {
     @Test
     public void testOnCluster() throws Exception {
         // run the test on cluster
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         String inputFileName = runTest(pigServer);
         Util.deleteFile(cluster, inputFileName);
         pigServer.shutdown();
@@ -190,7 +189,7 @@ public class TestCombiner {
         Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
         Properties props = cluster.getProperties();
         props.setProperty("io.sort.mb", "1");
-        PigServer pigServer = new PigServer(execType, props);
+        PigServer pigServer = new PigServer(cluster.getExecType(), props);
         pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
         pigServer.registerQuery("b = group a all;");
         pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
@@ -231,7 +230,7 @@ public class TestCombiner {
                         "pig1\t20\t3.1" };
 
         Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
         pigServer.registerQuery("c = foreach b  {" +
@@ -255,7 +254,7 @@ public class TestCombiner {
         while (it.hasNext()) {
             Tuple t = it.next();
             List<Object> fields = t.getAll();
-            Object[] expected = results.get((String)fields.get(0));
+            Object[] expected = results.get(fields.get(0));
             int i = 0;
             for (Object field : fields) {
                 assertEquals(expected[i++], field);
@@ -278,7 +277,7 @@ public class TestCombiner {
         };
 
         Util.createInputFile(cluster, "testGroupElements.txt", input);
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
         pigServer.registerQuery("b = group a by (str, num1);");
 
@@ -339,7 +338,7 @@ public class TestCombiner {
         };
 
         Util.createInputFile(cluster, "testGroupLimit.txt", input);
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load 'testGroupLimit.txt'  using PigStorage(' ') " +
                 "as (str:chararray, num1:int) ;");
         pigServer.registerQuery("b = group a by str;");
@@ -388,7 +387,7 @@ public class TestCombiner {
                         "pig1\t20\t3.1" };
 
         Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
         pigServer.registerQuery("c = foreach b  {" +
@@ -409,7 +408,7 @@ public class TestCombiner {
         while (it.hasNext()) {
             Tuple t = it.next();
             List<Object> fields = t.getAll();
-            Object[] expected = results.get((String)fields.get(0));
+            Object[] expected = results.get(fields.get(0));
             int i = 0;
             for (Object field : fields) {
                 if (i == 1) {
@@ -439,7 +438,7 @@ public class TestCombiner {
                         "pig1\t20\t3.1" };
 
         Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
-        PigServer pigServer = new PigServer(execType, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
         pigServer.registerQuery("b = group a by name;");
         pigServer.registerQuery("c = foreach b  {" +
@@ -460,7 +459,7 @@ public class TestCombiner {
         while (it.hasNext()) {
             Tuple t = it.next();
             List<Object> fields = t.getAll();
-            Object[] expected = results.get((String)fields.get(0));
+            Object[] expected = results.get(fields.get(0));
             int i = 0;
             for (Object field : fields) {
                 if (i == 1) {
@@ -498,7 +497,7 @@ public class TestCombiner {
         try {
             Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
 
-            PigServer pigServer = new PigServer(execType, cluster.getProperties());
+            PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
             pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
             pigServer.registerQuery("b = group a by name;");
             pigServer.registerQuery("c = foreach b generate group, SUM(a.age), a;");
@@ -520,6 +519,7 @@ public class TestCombiner {
 
     public static class JiraPig1030 extends EvalFunc<DataBag> {
 
+        @Override
         public DataBag exec(Tuple input) throws IOException {
             return new DefaultDataBag();
         }
@@ -543,7 +543,7 @@ public class TestCombiner {
 
         try {
             Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
-            PigServer pigServer = new PigServer(execType, cluster.getProperties());
+            PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
             pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
             pigServer.registerQuery("b = group a all;");
             pigServer.registerQuery("c = foreach b  {" +

Added: pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java (added)
+++ pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestCustomPartitioner {
+
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+    private PigServer pigServer;
+
+    TupleFactory mTf = TupleFactory.getInstance();
+    BagFactory mBf = BagFactory.getInstance();
+
+    @Before
+    public void setUp() throws Exception{
+        FileLocalizer.setR(new Random());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+
+    // See PIG-282
+    @Test
+    @Ignore
+    // Fails with Tez - DefaultSorter.java Illegal partition for Null: false index: 0 1 (-1), TotalPartitions: 0
+    public void testCustomPartitionerParseJoins() throws Exception{
+    	String[] input = {
+                "1\t3",
+                "1\t2"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
+
+        // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
+        try {
+            pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+            pigServer.registerQuery("B = ORDER A by $0;");
+        	pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        	//control should not reach here
+        	Assert.fail("Skewed join cannot accept a custom partitioner");
+        } catch(FrontendException e) {
+        	Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
+		}
+
+        pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("hash");
+
+        List<String> expected = new ArrayList<String>();
+        expected.add("(1,3,1,2)");
+        expected.add("(1,3,1,3)");
+        expected.add("(1,2,1,2)");
+        expected.add("(1,2,1,3)");
+        Collections.sort(expected);
+
+        List<String> actual = new ArrayList<String>();
+        while (iter.hasNext()) {
+            actual.add(iter.next().toString());
+        }
+        Collections.sort(actual);
+
+        Assert.assertEquals(expected, actual);
+
+        // No checks are made for merged and replicated joins as they are compiled to a map only job
+        // No frontend error checking has been added for these jobs, hence not adding any test cases
+        // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
+
+        Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+    }
+
+    // See PIG-282
+    @Test
+    public void testCustomPartitionerGroups() throws Exception{
+        String[] input = {
+                "1\t1",
+                "2\t1",
+                "3\t1",
+                "4\t1"
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
+
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
+
+        // It should be noted that for a map reduce job, the total number of partitions
+        // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
+        // we will get more than one reduce job so that we can use the partitioner.
+        // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+        // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+        // partition number is bigger than 1.
+        //
+        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+
+        pigServer.store("B", "tmp_testCustomPartitionerGroups");
+
+        new File("tmp_testCustomPartitionerGroups").mkdir();
+
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
+        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
+        String line = null;
+        while((line = reader.readLine()) != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
+        line = null;
+        int count=0;
+        while((line = reader.readLine()) != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        Assert.assertEquals(4, count);
+        Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+        Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
+        Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+    }
+
+    // See PIG-3385
+    @Test
+    public void testCustomPartitionerDistinct() throws Exception{
+        String[] input = {
+                "1\t1",
+                "2\t1",
+                "1\t1",
+                "3\t1",
+                "4\t1",
+        };
+        Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+        pigServer.store("B", "tmp_testCustomPartitionerDistinct");
+
+        new File("tmp_testCustomPartitionerDistinct").mkdir();
+
+        // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
+        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
+        String line = null;
+        while((line = reader.readLine()) != null) {
+            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
+        }
+        reader.close();
+        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
+        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
+        line = null;
+        int count=0;
+        while((line = reader.readLine()) != null) {
+            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+            count++;
+        }
+        reader.close();
+        Assert.assertEquals(4, count);
+        Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
+        Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
+        Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+    }
+
+    // See PIG-282
+    @Test
+    @Ignore
+    // TODO: CROSS not implemented in TEZ yet
+    public void testCustomPartitionerCross() throws Exception{
+    	String[] input = {
+                "1\t3",
+                "1\t2",
+        };
+
+        Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
+        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
+        pigServer.registerQuery("B = ORDER A by $0;");
+        pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        Tuple t;
+
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3,1,2)");
+        results.add("(1,3,1,3)");
+        results.add("(1,2,1,2)");
+        results.add("(1,2,1,3)");
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Assert.assertTrue(iter.hasNext());
+        t = iter.next();
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(results.contains(t.toString()));
+
+        Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+    }
+}

Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java Thu Jan 23 01:06:21 2014
@@ -492,192 +492,6 @@ public class TestEvalPipeline2 {
         Util.deleteFile(cluster, "table_testNestedDescSort");
     }
     
-    // See PIG-282
-    @Test
-    public void testCustomPartitionerParseJoins() throws Exception{
-    	String[] input = {
-                "1\t3",
-                "1\t2"
-        };
-        Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
-        
-        // Custom Partitioner is not allowed for skewed joins, will throw a ExecException 
-        try {
-            pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
-            pigServer.registerQuery("B = ORDER A by $0;");
-        	pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
-        	//control should not reach here
-        	Assert.fail("Skewed join cannot accept a custom partitioner");
-        } catch(FrontendException e) {
-        	Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
-		}
-        
-        pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
-        Iterator<Tuple> iter = pigServer.openIterator("hash");
-        Tuple t;
-        
-        Collection<String> results = new HashSet<String>();
-        results.add("(1,3,1,2)");
-        results.add("(1,3,1,3)");
-        results.add("(1,2,1,2)");
-        results.add("(1,2,1,3)");
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        // No checks are made for merged and replicated joins as they are compiled to a map only job 
-        // No frontend error checking has been added for these jobs, hence not adding any test cases 
-        // Manually tested the sanity once. Above test should cover the basic sanity of the scenario 
-        
-        Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
-    }
-    
-    // See PIG-282
-    @Test
-    public void testCustomPartitionerGroups() throws Exception{
-        String[] input = {
-                "1\t1",
-                "2\t1",
-                "3\t1",
-                "4\t1"
-        };
-        Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
-        
-        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
-        
-        // It should be noted that for a map reduce job, the total number of partitions 
-        // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein 
-        // we will get more than one reduce job so that we can use the partitioner.
-        // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
-        // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
-        // partition number is bigger than 1.
-        //
-        pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
-        
-        pigServer.store("B", "tmp_testCustomPartitionerGroups");
-        
-        new File("tmp_testCustomPartitionerGroups").mkdir();
-        
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
-        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
-        String line = null;
-        while((line = reader.readLine()) != null) {
-            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
-        }
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
-        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
-        line = null;
-        int count=0;
-        while((line = reader.readLine()) != null) {
-            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
-            count++;
-        }
-        Assert.assertEquals(4, count);
-        Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
-        Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
-        Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
-    }
-
-    // See PIG-3385
-    @Test
-    public void testCustomPartitionerDistinct() throws Exception{
-        String[] input = {
-                "1\t1",
-                "2\t1",
-                "1\t1",
-                "3\t1",
-                "4\t1",
-        };
-        Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
-
-        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
-        pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
-        pigServer.store("B", "tmp_testCustomPartitionerDistinct");
-
-        new File("tmp_testCustomPartitionerDistinct").mkdir();
-
-        // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
-        BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
-        String line = null;
-        while((line = reader.readLine()) != null) {
-            Assert.fail("Partition 0 should be empty.  Most likely Custom Partitioner was not used.");
-        }
-        Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
-        reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
-        line = null;
-        int count=0;
-        while((line = reader.readLine()) != null) {
-            //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
-            count++;
-        }
-        Assert.assertEquals(4, count);
-        Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
-        Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
-        Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
-    }
-
-    // See PIG-282
-    @Test
-    public void testCustomPartitionerCross() throws Exception{
-    	String[] input = {
-                "1\t3",
-                "1\t2",
-        };
-    	
-        Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
-        pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
-        pigServer.registerQuery("B = ORDER A by $0;");
-        pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
-        Iterator<Tuple> iter = pigServer.openIterator("C");
-        Tuple t;
-        
-        Collection<String> results = new HashSet<String>();
-        results.add("(1,3,1,2)");
-        results.add("(1,3,1,3)");
-        results.add("(1,2,1,2)");
-        results.add("(1,2,1,3)");
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
-        Assert.assertTrue(t.size()==4);
-        Assert.assertTrue(results.contains(t.toString()));
-        
-        Util.deleteFile(cluster, "table_testCustomPartitionerCross");
-    }
-    
     // See PIG-972
     @Test
     public void testDescribeNestedAlias() throws Exception{

Modified: pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigServer.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigServer.java Thu Jan 23 01:06:21 2014
@@ -535,16 +535,16 @@ public class TestPigServer {
         pig.registerQuery("c = group b by site;");
         pig.registerQuery("d = foreach c generate FLATTEN($1);");
         pig.registerQuery("e = group d by $2;");
-        
+
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pig.explain("e", "xml", true, false, ps, ps, null, null);
-        
+
         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
         DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
         DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
         Document doc = dBuilder.parse(bais);
-        
+
         //Verify Logical and Physical Plans aren't supported.
         NodeList logicalPlan = doc.getElementsByTagName("logicalPlan");
         assertEquals(1, logicalPlan.getLength());
@@ -552,20 +552,20 @@ public class TestPigServer {
         NodeList physicalPlan = doc.getElementsByTagName("physicalPlan");
         assertEquals(1, physicalPlan.getLength());
         assertTrue(physicalPlan.item(0).getTextContent().contains("Not Supported"));
-        
+
         //Verify we have two loads and one is temporary
         NodeList loads = doc.getElementsByTagName("POLoad");
         assertEquals(2, loads.getLength());
-        
+
         boolean sawTempLoad = false;
         boolean sawNonTempLoad = false;
         for (int i = 0; i < loads.getLength(); i++) {
             Boolean isTempLoad = null;
             boolean hasAlias = false;
-            
+
             Node poLoad = loads.item(i);
             NodeList children = poLoad.getChildNodes();
-            
+
             for (int j = 0; j < children.getLength(); j++) {
                 Node child = children.item(j);
                 if (child.getNodeName().equals("alias")) {
@@ -579,7 +579,7 @@ public class TestPigServer {
                     }
                 }
             }
-            
+
             if (isTempLoad == null) {
                 fail("POLoad elements should have isTmpLoad child node.");
             } else if (isTempLoad && hasAlias) {
@@ -587,11 +587,11 @@ public class TestPigServer {
             } else if (!isTempLoad && !hasAlias) {
                 fail("Non temporary loads should be associated with alias.");
             }
-            
+
             sawTempLoad = sawTempLoad || isTempLoad;
             sawNonTempLoad = sawNonTempLoad || !isTempLoad;
         }
-        
+
         assertTrue(sawTempLoad && sawNonTempLoad);
     }
 

Modified: pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java Thu Jan 23 01:06:21 2014
@@ -27,45 +27,49 @@ import java.io.PrintStream;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestSecondarySort {
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    private PigServer pigServer;
-
-    static PigContext pc;
-    static {
-        pc = new PigContext(ExecType.MAPREDUCE, MiniCluster.buildCluster().getProperties());
-        try {
-            pc.connect();
-        } catch (ExecException e) {
-            throw new RuntimeException(e);
-        }
-    }
+public abstract class TestSecondarySort {
+    protected static MiniGenericCluster cluster = null;
+    protected PigServer pigServer;
+    protected static PigContext pc;
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
+        cluster = null;
     }
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        if (cluster == null) {
+            cluster = getCluster();
+            pc = new PigContext(cluster.getExecType(), cluster.getProperties());
+            try {
+                pc.connect();
+            } catch (ExecException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        pigServer = new PigServer(pc);
     }
 
+    public abstract SecondaryKeyOptimizer visitSecondaryKeyOptimizer(
+            String query) throws Exception, VisitorException;
+
+    public abstract MiniGenericCluster getCluster();
+
     @Test
     public void testDistinctOptimization1() throws Exception {
         // Distinct on one entire input
@@ -74,15 +78,11 @@ public class TestSecondarySort {
         "C = foreach B { D = distinct A; generate group, D;};"+
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
 
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(0, so.getNumSortRemoved());
-        assertEquals(1, so.getDistinctChanged());
+        assertEquals(1, so.getNumDistinctChanged());
     }
 
     @Test
@@ -93,15 +93,11 @@ public class TestSecondarySort {
         "C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};" +
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
 
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
-
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(1, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -112,15 +108,11 @@ public class TestSecondarySort {
         "C = foreach B { D = limit A 10; E = order D by $0; generate group, E;};" +
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
 
-        assertEquals(0, so.getNumMRUseSecondaryKey());
+        assertEquals(0, so.getNumUseSecondaryKey());
         assertEquals(1, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -131,15 +123,11 @@ public class TestSecondarySort {
         "C = foreach B { D = limit A 10; E = order D by $1; F = order E by $0; generate group, F;};"+
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
 
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
-
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(2, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -150,15 +138,11 @@ public class TestSecondarySort {
         "C = foreach B { D = limit A 10; E = order D by $0, $1, $2; generate group, E;};" +
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
 
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(1, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -168,15 +152,11 @@ public class TestSecondarySort {
         "B = group A by $0;" +
         "C = foreach B { D = limit A 10; E = order D by $1; F = order E by $2; generate group, F;};" +
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
-
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(1, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -187,15 +167,11 @@ public class TestSecondarySort {
         "C = foreach B { D = order A by $0 desc; generate group, D;};" +
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
 
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(1, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -206,15 +182,11 @@ public class TestSecondarySort {
         "C = foreach B { D = order A by $0, $1 desc; generate group, D;};" +
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
 
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(1, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     // See PIG-1193
@@ -226,15 +198,11 @@ public class TestSecondarySort {
         "C = foreach B { D = order A by $0 desc; generate DIFF(D, D);};" +
 
         "store C into 'output';");
-        PhysicalPlan pp = Util.buildPp(pigServer, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
-        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
-        so.visit();
-
-        assertEquals(1, so.getNumMRUseSecondaryKey());
+        SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+        assertEquals(1, so.getNumUseSecondaryKey());
         assertEquals(2, so.getNumSortRemoved());
-        assertEquals(0, so.getDistinctChanged());
+        assertEquals(0, so.getNumDistinctChanged());
     }
 
     @Test
@@ -403,25 +371,78 @@ public class TestSecondarySort {
 
     @Test
     public void testNestedSortMultiQueryEndToEnd1() throws Exception {
-        pigServer.setBatchOn();
-        Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd",
-                "testNestedSortMultiQueryEndToEnd1-input.txt");
-        pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd1-input.txt'"
-                + " using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
-        pigServer.registerQuery("b = group a by uname parallel 2;");
-        pigServer.registerQuery("c = group a by gid parallel 2;");
-        pigServer.registerQuery("d = foreach b generate SUM(a.gid);");
-        pigServer.registerQuery("e = foreach c { f = order a by uid; generate group, f; };");
-        pigServer.registerQuery("store d into '/tmp/output1';");
-        pigServer.registerQuery("store e into '/tmp/output2';");
-
-        List<ExecJob> jobs = pigServer.executeBatch();
-        for (ExecJob job : jobs) {
-            assertEquals(ExecJob.JOB_STATUS.COMPLETED, job.getStatus());
+        try {
+            pigServer.setBatchOn();
+            Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd",
+                    "testNestedSortMultiQueryEndToEnd1-input.txt");
+            pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd1-input.txt'"
+                    + " using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            pigServer.registerQuery("b = group a by uname parallel 2;");
+            pigServer.registerQuery("c = group a by gid parallel 2;");
+            pigServer.registerQuery("d = foreach b generate SUM(a.gid);");
+            pigServer.registerQuery("e = foreach c { f = order a by uid; generate group, f; };");
+            pigServer.registerQuery("store d into '/tmp/output1';");
+            pigServer.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = pigServer.executeBatch();
+            for (ExecJob job : jobs) {
+                assertEquals(ExecJob.JOB_STATUS.COMPLETED, job.getStatus());
+            }
+        } finally {
+            FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
+            FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
+            Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd1-input.txt");
+        }
+    }
+
+    @Test
+    public void testNestedSortMultiQueryEndToEnd2() throws Exception {
+        File input = Util.createTempFileDelOnExit("test", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(input));
+        ps1.println("2\t2\t4");
+        ps1.println("2\t3\t4");
+        ps1.println("1\t2\t3");
+        ps1.println("1\t3\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t5");
+        ps1.close();
+        Util.copyFromLocalToCluster(cluster, input.getCanonicalPath(), "testNestedSortMultiQueryEndToEnd2-input.txt");
+
+        try {
+            pigServer.setBatchOn();
+            pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd2-input.txt' as (a0, a1, a2);");
+            pigServer.registerQuery("b = group a by a0 parallel 2;");
+            pigServer.registerQuery("c = group a by a1 parallel 2;");
+            pigServer.registerQuery("d = foreach b generate group, SUM(a.a2);");
+            pigServer.registerQuery("e = foreach c { f = order a by a0,a2; generate group, f; };");
+            pigServer.registerQuery("store d into '/tmp/output1';");
+            pigServer.registerQuery("store e into '/tmp/output2';");
+
+            List<ExecJob> jobs = pigServer.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+            pigServer.registerQuery("D = load '/tmp/output1';");
+            pigServer.registerQuery("E = load '/tmp/output2';");
+            pigServer.executeBatch();
+
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+            Schema s = pigServer.dumpSchema("D");
+            String[] expectedD = { "(1,16.0)", "(2,8.0)" };
+            Util.checkQueryOutputsAfterSortRecursive(iter, expectedD,
+                    org.apache.pig.newplan.logical.Util.translateSchema(s));
+
+            iter = pigServer.openIterator("E");
+            s = pigServer.dumpSchema("E");
+            String[] expectedE = { "(2,{(1,2,3),(1,2,4),(1,2,5),(2,2,4)})", "(3,{(1,3,4),(2,3,4)})" };
+            Util.checkQueryOutputsAfterSortRecursive(iter, expectedE,
+                    org.apache.pig.newplan.logical.Util.translateSchema(s));
+        } finally {
+            FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
+            FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
+            Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd2-input.txt");
         }
-        FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
-        FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
-        Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd1-input.txt");
+
     }
 
     // See PIG-1978

Added: pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java (added)
+++ pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizerMR;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TestSecondarySortMR extends TestSecondarySort {
+
+    public TestSecondarySortMR() {
+        super();
+    }
+
+    @Override
+    public MiniGenericCluster getCluster() {
+        return MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_MR);
+    }
+
+    @Override
+    public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query)
+            throws Exception, VisitorException {
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+        SecondaryKeyOptimizerMR so = new SecondaryKeyOptimizerMR(mrPlan);
+        so.visit();
+        return so;
+    }
+
+}
+

Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Thu Jan 23 01:06:21 2014
@@ -56,11 +56,11 @@ public class TestSkewedJoin {
     private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
 
     private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "5");
         pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", "0.01");
         createFiles();