You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/23 02:06:22 UTC
svn commit: r1560565 [2/3] - in /pig/branches/tez:
shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/optimizer/ src/org/apach...
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnChainInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SortKeyInfo;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerUtil {
+ private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
+
+ private SecondaryKeyOptimizerUtil() {
+
+ }
+
+ private static class POToChange {
+ POToChange(PhysicalOperator oper, PhysicalPlan plan, POForEach forEach) {
+ this.oper = oper;
+ this.plan = plan;
+ this.forEach = forEach;
+ }
+
+ PhysicalOperator oper;
+
+ PhysicalPlan plan;
+
+ POForEach forEach;
+ }
+
+ public static class SecondaryKeyOptimizerInfo {
+
+ private int numSortRemoved = 0;
+ private int numDistinctChanged = 0;
+ private int numUseSecondaryKey = 0;
+ private boolean useSecondaryKey = false;
+ private boolean[] secondarySortOrder;
+
+ public int getNumSortRemoved() {
+ return numSortRemoved;
+ }
+
+ public void incrementNumSortRemoved() {
+ this.numSortRemoved++;
+ }
+
+ public int getNumDistinctChanged() {
+ return numDistinctChanged;
+ }
+
+ public void incrementNumDistinctChanged() {
+ this.numDistinctChanged++;
+ }
+
+ public int getNumUseSecondaryKey() {
+ return numUseSecondaryKey;
+ }
+
+ public void incrementNumUseSecondaryKey() {
+ this.numUseSecondaryKey++;
+ }
+
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public boolean[] getSecondarySortOrder() {
+ return secondarySortOrder;
+ }
+
+ public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+ this.secondarySortOrder = secondarySortOrder;
+ }
+
+ }
+
+ // Build sort key structure from POLocalRearrange
+ private static SortKeyInfo getSortKeyInfo(POLocalRearrange rearrange) throws ExecException {
+ SortKeyInfo result = new SortKeyInfo();
+ List<PhysicalPlan> plans = rearrange.getPlans();
+ nextPlan: for (int i = 0; i < plans.size(); i++) {
+ PhysicalPlan plan = plans.get(i);
+ ColumnChainInfo columnChainInfo = new ColumnChainInfo();
+ if (plan.getRoots() == null) {
+ log.debug("POLocalRearrange plan is null");
+ return null;
+ } else if (plan.getRoots().size() != 1) {
+ // POLocalRearrange plan contains more than 1 root.
+ // Probably there is an Expression operator in the local
+ // rearrangement plan, skip this plan
+ continue nextPlan;
+ } else {
+ List<Integer> columns = new ArrayList<Integer>();
+ columns
+ .add(rearrange.getIndex()
+ & PigNullableWritable.idxSpace);
+
+ // The first item inside columnChainInfo is set to type Tuple.
+ // This value is not actually in use, but it intends to match
+ // the type of POProject in reduce side
+ columnChainInfo.insert(columns, DataType.TUPLE);
+
+ PhysicalOperator node = plan.getRoots().get(0);
+ while (node != null) {
+ if (node instanceof POProject) {
+ POProject project = (POProject) node;
+ if(project.isProjectToEnd()){
+ columnChainInfo.insert(project.getStartCol(),
+ project.getResultType());
+ }else {
+ columnChainInfo.insert(
+ project.getColumns(), project.getResultType());
+ }
+
+ if (plan.getSuccessors(node) == null)
+ node = null;
+ else if (plan.getSuccessors(node).size() != 1) {
+ log.debug(node + " have more than 1 successor");
+ node = null;
+ } else
+ node = plan.getSuccessors(node).get(0);
+ } else
+ // constant, UDF, we will pass
+ continue nextPlan;
+ }
+ }
+ // Let's assume all main key is sorted ascendant, we can further
+ // optimize it to match one of the nested sort/distinct key, because we do not
+ // really care about how cogroup key are sorted; But it may not be the case
+ // if sometime we switch all the comparator to byte comparator, so just
+ // leave it as it is for now
+ result.insertColumnChainInfo(i, columnChainInfo, true);
+ }
+ return result;
+ }
+
+ public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+ log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
+ SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo();
+ List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
+ SortKeyInfo secondarySortKeyInfo = null;
+ List<POToChange> sortsToRemove = null;
+ List<POToChange> distinctsToChange = null;
+
+
+ List<PhysicalOperator> mapLeaves = mapPlan.getLeaves();
+ if (mapLeaves == null || mapLeaves.size() != 1) {
+ log.debug("Expected map to have single leaf! Skip secondary key optimizing");
+ return null;
+ }
+ PhysicalOperator mapLeaf = mapLeaves.get(0);
+
+ // Figure out the main key of the map-reduce job from POLocalRearrange
+ try {
+ if (mapLeaf instanceof POLocalRearrange) {
+ SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
+ if (sortKeyInfo == null) {
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ return null;
+ }
+ sortKeyInfos.add(sortKeyInfo);
+ } else if (mapLeaf instanceof POUnion) {
+ List<PhysicalOperator> preds = mapPlan
+ .getPredecessors(mapLeaf);
+ for (PhysicalOperator pred : preds) {
+ if (pred instanceof POLocalRearrange) {
+ SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) pred);
+ if (sortKeyInfo == null) {
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ return null;
+ }
+ sortKeyInfos.add(sortKeyInfo);
+ }
+ }
+ } else {
+ log.debug("Cannot find POLocalRearrange or POUnion in map leaf, skip secondary key optimizing");
+ return null;
+ }
+ } catch (ExecException e) {
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ return null;
+ }
+
+ if (reducePlan.isEmpty()) {
+ log.debug("Reduce plan is empty, skip secondary key optimizing");
+ return null;
+ }
+
+ List<PhysicalOperator> reduceRoots = reducePlan.getRoots();
+ if (reduceRoots.size() != 1) {
+ log.debug("Expected reduce to have single root, skip secondary key optimizing");
+ return null;
+ }
+
+ PhysicalOperator root = reduceRoots.get(0);
+ if (!(root instanceof POPackage)) {
+ log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+ return null;
+ }
+
+ // visit the POForEach of the reduce plan. We can have Limit and Filter
+ // in the middle
+ PhysicalOperator currentNode = root;
+ POForEach foreach = null;
+ while (currentNode != null) {
+ if (currentNode instanceof POPackage
+ && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
+ || currentNode instanceof POFilter
+ || currentNode instanceof POLimit) {
+ List<PhysicalOperator> succs = reducePlan
+ .getSuccessors(currentNode);
+ if (succs == null) // We didn't find POForEach
+ return null;
+ if (succs.size() != 1) {
+ log.debug("See multiple output for " + currentNode
+ + " in reduce plan, skip secondary key optimizing");
+ return null;
+ }
+ currentNode = succs.get(0);
+ } else if (currentNode instanceof POForEach) {
+ foreach = (POForEach) currentNode;
+ break;
+ } else { // Skip optimization
+ return null;
+ }
+ }
+
+ // We do not find a foreach (we shall not come here, a trick to fool findbugs)
+ if (foreach==null)
+ return null;
+
+ sortsToRemove = new ArrayList<POToChange>();
+ distinctsToChange = new ArrayList<POToChange>();
+
+ for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
+ // visit inner plans to figure out the sort order for distinct /
+ // sort
+ SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
+ innerPlan, sortKeyInfos, secondarySortKeyInfo);
+ try {
+ innerPlanDiscover.process();
+ } catch (FrontendException e) {
+ int errorCode = 2213;
+ throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+ }
+ secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+ if (innerPlanDiscover.getSortsToRemove() != null) {
+ for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
+ sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
+ }
+ }
+ if (innerPlanDiscover.getDistinctsToChange() != null) {
+ for (PODistinct distinct : innerPlanDiscover
+ .getDistinctsToChange()) {
+ distinctsToChange.add(new POToChange(distinct, innerPlan,
+ foreach));
+ }
+ }
+ }
+
+ try {
+ // Change PODistinct to use POSortedDistinct, which assume the input
+ // data is sorted
+ for (POToChange distinctToChange : distinctsToChange) {
+ secKeyOptimizerInfo.incrementNumDistinctChanged();
+ PODistinct oldDistinct = (PODistinct) distinctToChange.oper;
+ String scope = oldDistinct.getOperatorKey().scope;
+ POSortedDistinct newDistinct = new POSortedDistinct(
+ new OperatorKey(scope, NodeIdGenerator.getGenerator()
+ .getNextNodeId(scope)), oldDistinct
+ .getRequestedParallelism(), oldDistinct
+ .getInputs());
+ newDistinct.setInputs(oldDistinct.getInputs());
+ newDistinct.setResultType(oldDistinct.getResultType());
+ distinctToChange.plan.replace(oldDistinct, newDistinct);
+ distinctToChange.forEach.getLeaves();
+ }
+ // Removed POSort, if the successor require a databag, we need to
+ // add a PORelationToExprProject
+ // to convert tuples into databag
+ for (POToChange sortToRemove : sortsToRemove) {
+ secKeyOptimizerInfo.incrementNumSortRemoved();
+ POSort oldSort = (POSort) sortToRemove.oper;
+ String scope = oldSort.getOperatorKey().scope;
+ List<PhysicalOperator> preds = sortToRemove.plan
+ .getPredecessors(sortToRemove.oper);
+ List<PhysicalOperator> succs = sortToRemove.plan
+ .getSuccessors(sortToRemove.oper);
+ POProject project = null;
+ if ((preds == null
+ || preds.get(0).getResultType() != DataType.BAG
+ && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+ && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+ {
+ project = new PORelationToExprProject(new OperatorKey(
+ scope, NodeIdGenerator.getGenerator()
+ .getNextNodeId(scope)), oldSort
+ .getRequestedParallelism());
+ project.setInputs(oldSort.getInputs());
+ project.setResultType(DataType.BAG);
+ project.setStar(true);
+ }
+ if (project == null)
+ sortToRemove.plan.removeAndReconnect(sortToRemove.oper);
+ else
+ sortToRemove.plan.replace(oldSort, project);
+ sortToRemove.forEach.getLeaves();
+ }
+ } catch (PlanException e) {
+ int errorCode = 2202;
+ throw new VisitorException(
+ "Error change distinct/sort to use secondary key optimizer",
+ errorCode, e);
+ }
+ if (secondarySortKeyInfo != null) {
+ // Adjust POLocalRearrange, POPackage, MapReduceOper to use the
+ // secondary key
+ secKeyOptimizerInfo.incrementNumUseSecondaryKey();
+ secKeyOptimizerInfo.setUseSecondaryKey(true);
+ secKeyOptimizerInfo.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+ int indexOfRearrangeToChange = -1;
+ for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+ .getColumnChains()) {
+ ColumnInfo currentColumn = columnChainInfo.getColumnInfos()
+ .get(0);
+ int index = currentColumn.getColumns().get(0);
+ if (indexOfRearrangeToChange == -1)
+ indexOfRearrangeToChange = index;
+ else if (indexOfRearrangeToChange != index) {
+ int errorCode = 2203;
+ throw new VisitorException("Sort on columns from different inputs.", errorCode);
+ }
+ }
+ if (mapLeaf instanceof POLocalRearrange) {
+ ((POLocalRearrange) mapLeaf).setUseSecondaryKey(true);
+ setSecondaryPlan(mapPlan, (POLocalRearrange) mapLeaf,
+ secondarySortKeyInfo);
+ } else if (mapLeaf instanceof POUnion) {
+ List<PhysicalOperator> preds = mapPlan
+ .getPredecessors(mapLeaf);
+ boolean found = false;
+ for (PhysicalOperator pred : preds) {
+ POLocalRearrange rearrange = (POLocalRearrange) pred;
+ rearrange.setUseSecondaryKey(true);
+ if (rearrange.getIndex() == indexOfRearrangeToChange) {
+ // Try to find the POLocalRearrange for the secondary key
+ found = true;
+ setSecondaryPlan(mapPlan, rearrange, secondarySortKeyInfo);
+ }
+ }
+ if (!found)
+ {
+ int errorCode = 2214;
+ throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
+ }
+ }
+ POPackage pack = (POPackage) root;
+ pack.getPkgr().setUseSecondaryKey(true);
+ }
+ return secKeyOptimizerInfo;
+ }
+
+ private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange,
+ SortKeyInfo secondarySortKeyInfo) throws VisitorException {
+ // Put plan to project secondary key to the POLocalRearrange
+ try {
+ String scope = rearrange.getOperatorKey().scope;
+ List<PhysicalPlan> secondaryPlanList = new ArrayList<PhysicalPlan>();
+ for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+ .getColumnChains()) {
+ PhysicalPlan secondaryPlan = new PhysicalPlan();
+ for (int i = 1; i < columnChainInfo.size(); i++) {
+ // The first item in columnChainInfo indicate the index of
+ // input, we have addressed
+ // already before we come here
+ ColumnInfo columnInfo = columnChainInfo.getColumnInfo(i);
+ POProject project = new POProject(
+ new OperatorKey(scope, NodeIdGenerator
+ .getGenerator().getNextNodeId(scope)),
+ rearrange.getRequestedParallelism());
+ if(columnInfo.isRangeProject())
+ project.setProjectToEnd(columnInfo.getStartCol());
+ else
+ project
+ .setColumns((ArrayList<Integer>) columnInfo.getColumns());
+ project.setResultType(columnInfo.getResultType());
+ secondaryPlan.addAsLeaf(project);
+ }
+ if (secondaryPlan.isEmpty()) { // If secondary key sort on the
+ // input as a whole
+ POProject project = new POProject(
+ new OperatorKey(scope, NodeIdGenerator
+ .getGenerator().getNextNodeId(scope)),
+ rearrange.getRequestedParallelism());
+ project.setStar(true);
+ secondaryPlan.addAsLeaf(project);
+ }
+ secondaryPlanList.add(secondaryPlan);
+ }
+ rearrange.setSecondaryPlans(secondaryPlanList);
+ } catch (PlanException e) {
+ int errorCode = 2204;
+ throw new VisitorException("Error setting secondary key plan",
+ errorCode, e);
+ }
+ }
+
+ // Find eligible sort and distinct physical operators from the reduce plan.
+ // SecondaryKeyChecker will check for sort/distinct keys (for distinct, it
+ // is
+ // always the entire bag), if it is the same with the main key, then we can
+ // just
+ // remove it. If it is not, then we can have 1 secondary sort key, put the
+ // first
+ // such sort/distinct key as the secondary sort key. For subsequent
+ // sort/distinct,
+ // we cannot do any secondary key optimization because we only have 1
+ // secondary
+ // sort key.
+ private static class SecondaryKeyDiscover {
+ PhysicalPlan mPlan;
+
+ List<POSort> sortsToRemove = new ArrayList<POSort>();
+
+ List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
+
+ List<SortKeyInfo> sortKeyInfos;
+
+ SortKeyInfo secondarySortKeyInfo;
+
+ ColumnChainInfo columnChainInfo = null;
+
+ // PhysicalPlan here is foreach inner plan
+ SecondaryKeyDiscover(PhysicalPlan plan,
+ List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
+ this.mPlan = plan;
+ this.sortKeyInfos = sortKeyInfos;
+ this.secondarySortKeyInfo = secondarySortKeyInfo;
+ }
+
+ public void process() throws FrontendException
+ {
+ List<PhysicalOperator> roots = mPlan.getRoots();
+ for (PhysicalOperator root : roots) {
+ columnChainInfo = new ColumnChainInfo();
+ processRoot(root);
+ }
+ }
+
+ public void processRoot(PhysicalOperator root) throws FrontendException {
+ PhysicalOperator currentNode = root;
+ while (currentNode!=null) {
+ boolean sawInvalidPhysicalOper = false;
+ if (currentNode instanceof PODistinct)
+ sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+ else if (currentNode instanceof POSort)
+ sawInvalidPhysicalOper = processSort((POSort)currentNode);
+ else if (currentNode instanceof POProject)
+ sawInvalidPhysicalOper = processProject((POProject)currentNode);
+ else if (currentNode instanceof POUserFunc ||
+ currentNode instanceof POUnion ||
+ // We don't process foreach, since foreach is too complex to get right
+ currentNode instanceof POForEach)
+ break;
+
+ if (sawInvalidPhysicalOper)
+ break;
+
+ List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+ if (succs==null)
+ currentNode = null;
+ else {
+ if (succs.size()>1) {
+ int errorCode = 2215;
+ throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+ errorCode);
+ }
+ currentNode = succs.get(0);
+ }
+ }
+ }
+
+ // We see PODistinct, check which key it is using
+ public boolean processDistinct(PODistinct distinct) throws FrontendException {
+ SortKeyInfo keyInfos = new SortKeyInfo();
+ try {
+ keyInfos.insertColumnChainInfo(0,
+ (ColumnChainInfo) columnChainInfo.clone(), true);
+ } catch (CloneNotSupportedException e) { // We implement Clonable,
+ // impossible to get here
+ }
+
+ // if it is part of main key
+ for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+ if (sortKeyInfo.moreSpecificThan(keyInfos)) {
+ distinctsToChange.add(distinct);
+ return false;
+ }
+ }
+
+ // if it is part of secondary key
+ if (secondarySortKeyInfo != null
+ && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
+ distinctsToChange.add(distinct);
+ return false;
+ }
+
+ // Now set the secondary key
+ if (secondarySortKeyInfo == null) {
+ distinctsToChange.add(distinct);
+ secondarySortKeyInfo = keyInfos;
+ }
+ return false;
+ }
+
+ // Accumulate column info
+ public boolean processProject(POProject project) throws FrontendException {
+ columnChainInfo.insertInReduce(project);
+ return false;
+ }
+
+ // We see POSort, check which key it is using
+ public boolean processSort(POSort sort) throws FrontendException{
+ SortKeyInfo keyInfo = new SortKeyInfo();
+ for (int i = 0; i < sort.getSortPlans().size(); i++) {
+ PhysicalPlan sortPlan = sort.getSortPlans().get(i);
+ ColumnChainInfo sortChainInfo = null;
+ try {
+ sortChainInfo = (ColumnChainInfo) columnChainInfo.clone();
+ } catch (CloneNotSupportedException e) { // We implement
+ // Clonable, impossible
+ // to get here
+ }
+ boolean r = false;
+ try {
+ r = collectColumnChain(sortPlan, sortChainInfo);
+ } catch (PlanException e) {
+ int errorCode = 2206;
+ throw new FrontendException("Error visiting POSort inner plan",
+ errorCode, e);
+ }
+ if (r==true) // if we saw physical operator other than project in sort plan
+ {
+ return true;
+ }
+ keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
+ .getMAscCols().get(i));
+ }
+ // if it is part of main key
+ for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+ if (sortKeyInfo.moreSpecificThan(keyInfo)) {
+ sortsToRemove.add(sort);
+ return false;
+ }
+ }
+ // if it is part of secondary key
+ if (secondarySortKeyInfo != null
+ && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
+ sortsToRemove.add(sort);
+ return false;
+ }
+
+ // Now set the secondary key
+ if (secondarySortKeyInfo == null) {
+ sortsToRemove.add(sort);
+ secondarySortKeyInfo = keyInfo;
+ }
+ return false;
+ }
+
+ public List<POSort> getSortsToRemove() {
+ return sortsToRemove;
+ }
+
+ public List<PODistinct> getDistinctsToChange() {
+ return distinctsToChange;
+ }
+
+ public SortKeyInfo getSecondarySortKeyInfo() {
+ return secondarySortKeyInfo;
+ }
+ }
+
+ // Return true if we saw physical operators other than project in the plan
+ static private boolean collectColumnChain(PhysicalPlan plan,
+ ColumnChainInfo columnChainInfo) throws PlanException {
+ if (plan.getRoots().size() != 1) {
+ return true;
+ }
+
+ PhysicalOperator currentNode = plan.getRoots().get(0);
+
+ while (currentNode != null) {
+ if (currentNode instanceof POProject) {
+ POProject project = (POProject) currentNode;
+ columnChainInfo.insertInReduce(project);
+ } else {
+ return true;
+ }
+ List<PhysicalOperator> succs = plan.getSuccessors(currentNode);
+ if (succs == null)
+ break;
+ if (succs.size() != 1) {
+ int errorCode = 2208;
+ throw new PlanException(
+ "Exception visiting foreach inner plan", errorCode);
+ }
+ currentNode = succs.get(0);
+ }
+ return false;
+ }
+
+}
Modified: pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java Thu Jan 23 01:06:21 2014
@@ -42,6 +42,12 @@ public class NullableTuple extends PigNu
mValue = t;
}
+ public NullableTuple(NullableTuple copy) {
+ setNull(copy.isNull());
+ mValue = copy.mValue;
+ setIndex(copy.getIndex());
+ }
+
@Override
public Object getValueAsPigType() {
return isNull() ? null : (Tuple)mValue;
Modified: pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java Thu Jan 23 01:06:21 2014
@@ -61,6 +61,14 @@ public abstract class PigNullableWritabl
private byte mIndex;
+ public static PigNullableWritable newInstance(PigNullableWritable copy) throws Exception {
+ PigNullableWritable instance = copy.getClass().newInstance();
+ instance.mNull = copy.mNull;
+ instance.mValue = copy.mValue;
+ instance.mIndex = copy.mIndex;
+ return instance;
+ }
+
/**
* Compare two nullable objects. Step one is to check if either or both
* are null. If one is null and the other is not, then the one that is
Modified: pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java Thu Jan 23 01:06:21 2014
@@ -17,20 +17,20 @@
*/
package org.apache.pig.test;
-import java.io.*;
+import java.io.IOException;
import java.util.Properties;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
/**
- * This class builds a single instance of itself with the Singleton
- * design pattern. While building the single instance, it sets up a
- * mini cluster that actually consists of a mini DFS cluster and a
- * mini MapReduce cluster on the local machine and also sets up the
+ * This class builds a single instance of itself with the Singleton
+ * design pattern. While building the single instance, it sets up a
+ * mini cluster that actually consists of a mini DFS cluster and a
+ * mini MapReduce cluster on the local machine and also sets up the
* environment for Pig to run on top of the mini cluster.
*
* This class is the base class for MiniCluster, which has slightly
@@ -45,6 +45,9 @@ abstract public class MiniGenericCluster
protected static MiniGenericCluster INSTANCE = null;
protected static boolean isSetup = false;
+ public static String EXECTYPE_MR = "mr";
+ public static String EXECTYPE_TEZ = "tez";
+
/**
* Returns the single instance of class MiniGenericCluster that represents
* the resources for a mini dfs cluster and a mini mr (or tez) cluster. The
@@ -58,9 +61,16 @@ abstract public class MiniGenericCluster
throw new RuntimeException("test.exec.type is not set");
}
- if (execType.equalsIgnoreCase("mr")) {
+ return buildCluster(execType);
+ }
+ return INSTANCE;
+ }
+
+ public static MiniGenericCluster buildCluster(String execType) {
+ if (INSTANCE == null) {
+ if (execType.equalsIgnoreCase(EXECTYPE_MR)) {
INSTANCE = new MiniCluster();
- } else if (execType.equalsIgnoreCase("tez")) {
+ } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
INSTANCE = new TezMiniCluster();
} else {
throw new RuntimeException("Unknown test.exec.type: " + execType);
@@ -73,22 +83,25 @@ abstract public class MiniGenericCluster
return INSTANCE;
}
+ abstract protected ExecType getExecType();
+
abstract protected void setupMiniDfsAndMrClusters();
public void shutDown(){
INSTANCE.shutdownMiniDfsAndMrClusters();
}
-
+
+ @Override
protected void finalize() {
shutdownMiniDfsAndMrClusters();
}
-
+
protected void shutdownMiniDfsAndMrClusters() {
isSetup = false;
shutdownMiniDfsClusters();
shutdownMiniMrClusters();
}
-
+
protected void shutdownMiniDfsClusters() {
try {
if (m_fileSys != null) { m_fileSys.close(); }
@@ -99,7 +112,7 @@ abstract public class MiniGenericCluster
m_fileSys = null;
m_dfs = null;
}
-
+
abstract protected void shutdownMiniMrClusters();
public Properties getProperties() {
@@ -115,12 +128,12 @@ abstract public class MiniGenericCluster
errorIfNotSetup();
m_conf.set(name, value);
}
-
+
public FileSystem getFileSystem() {
errorIfNotSetup();
return m_fileSys;
}
-
+
/**
* Throw RunTimeException if isSetup is false
*/
Modified: pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java Thu Jan 23 01:06:21 2014
@@ -37,7 +37,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
public class TestAccumulator {
@@ -413,8 +412,6 @@ public class TestAccumulator {
}
@Test
- @Ignore
- // TODO: Enabled the test case after SecondaryKeyOptimization is implemented in Tez
public void testAccumWithDistinct() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
pigServer.registerQuery("B = group A by id;");
@@ -429,15 +426,16 @@ public class TestAccumulator {
Iterator<Tuple> iter = pigServer.openIterator("C");
+ int count = 0;
while(iter.hasNext()) {
+ count++;
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
}
+ assertEquals(4, count);
}
@Test
- @Ignore
- // TODO: Enabled the test case after SecondaryKeyOptimization is implemented in Tez
public void testAccumWithSort() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, f);");
pigServer.registerQuery("B = foreach A generate id, f, id as t;");
@@ -453,10 +451,13 @@ public class TestAccumulator {
Iterator<Tuple> iter = pigServer.openIterator("D");
+ int count = 0;
while(iter.hasNext()) {
+ count++;
Tuple t = iter.next();
assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
}
+ assertEquals(4, count);
}
@Test
@@ -683,8 +684,6 @@ public class TestAccumulator {
* @throws ParseException
*/
@Test
- @Ignore
- // TODO: Enabled the test case after SecondaryKeyOptimization is implemented in Tez
public void testAccumAfterNestedOp() throws IOException, ParserException{
// test group by
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, fruit);");
Modified: pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCombiner.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestCombiner.java Thu Jan 23 01:06:21 2014
@@ -50,7 +50,6 @@ import org.junit.Test;
public class TestCombiner {
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
- static String execType = System.getProperty("test.exec.type");
@BeforeClass
public static void oneTimeSetUp() throws Exception {
@@ -88,7 +87,7 @@ public class TestCombiner {
"c = group a by c2; " +
"f = foreach c generate COUNT(org.apache.pig.builtin.Distinct($1.$2)); " +
"store f into 'out';";
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext pc = pigServer.getPigContext();
assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
.isEmpty()));
@@ -103,7 +102,7 @@ public class TestCombiner {
"f = foreach c generate COUNT(" + dummyUDF + "" +
"(org.apache.pig.builtin.Distinct($1.$2)," + dummyUDF + "())); " +
"store f into 'out';";
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext pc = pigServer.getPigContext();
assertTrue((Util.buildMRPlan(Util.buildPp(pigServer, query), pc).getRoots().get(0).combinePlan
.isEmpty()));
@@ -113,7 +112,7 @@ public class TestCombiner {
@Test
public void testOnCluster() throws Exception {
// run the test on cluster
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
String inputFileName = runTest(pigServer);
Util.deleteFile(cluster, inputFileName);
pigServer.shutdown();
@@ -190,7 +189,7 @@ public class TestCombiner {
Util.createInputFile(cluster, "MultiCombinerUseInput.txt", input);
Properties props = cluster.getProperties();
props.setProperty("io.sort.mb", "1");
- PigServer pigServer = new PigServer(execType, props);
+ PigServer pigServer = new PigServer(cluster.getExecType(), props);
pigServer.registerQuery("a = load 'MultiCombinerUseInput.txt' as (x:int);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b generate COUNT(a), SUM(a.$0), " +
@@ -231,7 +230,7 @@ public class TestCombiner {
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "distinctAggs1Input.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'distinctAggs1Input.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
@@ -255,7 +254,7 @@ public class TestCombiner {
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
- Object[] expected = results.get((String)fields.get(0));
+ Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
assertEquals(expected[i++], field);
@@ -278,7 +277,7 @@ public class TestCombiner {
};
Util.createInputFile(cluster, "testGroupElements.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'testGroupElements.txt' as (str:chararray, num1:int, alph : chararray, num2 : int);");
pigServer.registerQuery("b = group a by (str, num1);");
@@ -339,7 +338,7 @@ public class TestCombiner {
};
Util.createInputFile(cluster, "testGroupLimit.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'testGroupLimit.txt' using PigStorage(' ') " +
"as (str:chararray, num1:int) ;");
pigServer.registerQuery("b = group a by str;");
@@ -388,7 +387,7 @@ public class TestCombiner {
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "distinctNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'distinctNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
@@ -409,7 +408,7 @@ public class TestCombiner {
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
- Object[] expected = results.get((String)fields.get(0));
+ Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
if (i == 1) {
@@ -439,7 +438,7 @@ public class TestCombiner {
"pig1\t20\t3.1" };
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b {" +
@@ -460,7 +459,7 @@ public class TestCombiner {
while (it.hasNext()) {
Tuple t = it.next();
List<Object> fields = t.getAll();
- Object[] expected = results.get((String)fields.get(0));
+ Object[] expected = results.get(fields.get(0));
int i = 0;
for (Object field : fields) {
if (i == 1) {
@@ -498,7 +497,7 @@ public class TestCombiner {
try {
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a by name;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.age), a;");
@@ -520,6 +519,7 @@ public class TestCombiner {
public static class JiraPig1030 extends EvalFunc<DataBag> {
+ @Override
public DataBag exec(Tuple input) throws IOException {
return new DefaultDataBag();
}
@@ -543,7 +543,7 @@ public class TestCombiner {
try {
Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
- PigServer pigServer = new PigServer(execType, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
pigServer.registerQuery("b = group a all;");
pigServer.registerQuery("c = foreach b {" +
Added: pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java (added)
+++ pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestCustomPartitioner {
+
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+ private PigServer pigServer;
+
+ TupleFactory mTf = TupleFactory.getInstance();
+ BagFactory mBf = BagFactory.getInstance();
+
+ @Before
+ public void setUp() throws Exception{
+ FileLocalizer.setR(new Random());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ // See PIG-282
+ @Test
+ @Ignore
+ // Fails with Tez - DefaultSorter.java Illegal partition for Null: false index: 0 1 (-1), TotalPartitions: 0
+ public void testCustomPartitionerParseJoins() throws Exception{
+ String[] input = {
+ "1\t3",
+ "1\t2"
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
+
+ // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
+ try {
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = ORDER A by $0;");
+ pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ //control should not reach here
+ Assert.fail("Skewed join cannot accept a custom partitioner");
+ } catch(FrontendException e) {
+ Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
+ }
+
+ pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ Iterator<Tuple> iter = pigServer.openIterator("hash");
+
+ List<String> expected = new ArrayList<String>();
+ expected.add("(1,3,1,2)");
+ expected.add("(1,3,1,3)");
+ expected.add("(1,2,1,2)");
+ expected.add("(1,2,1,3)");
+ Collections.sort(expected);
+
+ List<String> actual = new ArrayList<String>();
+ while (iter.hasNext()) {
+ actual.add(iter.next().toString());
+ }
+ Collections.sort(actual);
+
+ Assert.assertEquals(expected, actual);
+
+ // No checks are made for merged and replicated joins as they are compiled to a map only job
+ // No frontend error checking has been added for these jobs, hence not adding any test cases
+ // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
+
+ Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
+ }
+
+ // See PIG-282
+ @Test
+ public void testCustomPartitionerGroups() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "3\t1",
+ "4\t1"
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
+
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
+
+ // It should be noted that for a map reduce job, the total number of partitions
+ // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
+ // we will get more than one reduce job so that we can use the partitioner.
+ // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
+ // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
+ // partition number is bigger than 1.
+ //
+ pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+
+ pigServer.store("B", "tmp_testCustomPartitionerGroups");
+
+ new File("tmp_testCustomPartitionerGroups").mkdir();
+
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
+ BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
+ reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
+ line = null;
+ int count=0;
+ while((line = reader.readLine()) != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ Assert.assertEquals(4, count);
+ Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
+ Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
+ Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
+ }
+
+ // See PIG-3385
+ @Test
+ public void testCustomPartitionerDistinct() throws Exception{
+ String[] input = {
+ "1\t1",
+ "2\t1",
+ "1\t1",
+ "3\t1",
+ "4\t1",
+ };
+ Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
+
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
+ pigServer.store("B", "tmp_testCustomPartitionerDistinct");
+
+ new File("tmp_testCustomPartitionerDistinct").mkdir();
+
+ // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
+ BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
+ String line = null;
+ while((line = reader.readLine()) != null) {
+ Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
+ }
+ reader.close();
+ Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
+ reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
+ line = null;
+ int count=0;
+ while((line = reader.readLine()) != null) {
+ //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
+ count++;
+ }
+ reader.close();
+ Assert.assertEquals(4, count);
+ Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
+ Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
+ Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
+ }
+
+ // See PIG-282
+ @Test
+ @Ignore
+ // TODO: CROSS not implemented in TEZ yet
+ public void testCustomPartitionerCross() throws Exception{
+ String[] input = {
+ "1\t3",
+ "1\t2",
+ };
+
+ Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
+ pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
+ pigServer.registerQuery("B = ORDER A by $0;");
+ pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ Tuple t;
+
+ Collection<String> results = new HashSet<String>();
+ results.add("(1,3,1,2)");
+ results.add("(1,3,1,3)");
+ results.add("(1,2,1,2)");
+ results.add("(1,2,1,3)");
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(results.contains(t.toString()));
+
+ Util.deleteFile(cluster, "table_testCustomPartitionerCross");
+ }
+}
Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java Thu Jan 23 01:06:21 2014
@@ -492,192 +492,6 @@ public class TestEvalPipeline2 {
Util.deleteFile(cluster, "table_testNestedDescSort");
}
- // See PIG-282
- @Test
- public void testCustomPartitionerParseJoins() throws Exception{
- String[] input = {
- "1\t3",
- "1\t2"
- };
- Util.createInputFile(cluster, "table_testCustomPartitionerParseJoins", input);
-
- // Custom Partitioner is not allowed for skewed joins, will throw a ExecException
- try {
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerParseJoins' as (a0:int, a1:int);");
- pigServer.registerQuery("B = ORDER A by $0;");
- pigServer.registerQuery("skewed = JOIN A by $0, B by $0 USING 'skewed' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
- //control should not reach here
- Assert.fail("Skewed join cannot accept a custom partitioner");
- } catch(FrontendException e) {
- Assert.assertTrue( e.getMessage().contains( "Custom Partitioner is not supported for skewed join" ) );
- }
-
- pigServer.registerQuery("hash = JOIN A by $0, B by $0 USING 'hash' PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
- Iterator<Tuple> iter = pigServer.openIterator("hash");
- Tuple t;
-
- Collection<String> results = new HashSet<String>();
- results.add("(1,3,1,2)");
- results.add("(1,3,1,3)");
- results.add("(1,2,1,2)");
- results.add("(1,2,1,3)");
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- // No checks are made for merged and replicated joins as they are compiled to a map only job
- // No frontend error checking has been added for these jobs, hence not adding any test cases
- // Manually tested the sanity once. Above test should cover the basic sanity of the scenario
-
- Util.deleteFile(cluster, "table_testCustomPartitionerParseJoins");
- }
-
- // See PIG-282
- @Test
- public void testCustomPartitionerGroups() throws Exception{
- String[] input = {
- "1\t1",
- "2\t1",
- "3\t1",
- "4\t1"
- };
- Util.createInputFile(cluster, "table_testCustomPartitionerGroups", input);
-
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerGroups' as (a0:int, a1:int);");
-
- // It should be noted that for a map reduce job, the total number of partitions
- // is the same as the number of reduce tasks for the job. Hence we need to find a case wherein
- // we will get more than one reduce job so that we can use the partitioner.
- // The following logic assumes that we get 2 reduce jobs, so that we can hard-code the logic.
- // SimpleCustomPartitioner3 simply returns '1' (second reducer) for all inputs when
- // partition number is bigger than 1.
- //
- pigServer.registerQuery("B = group A by $0 PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
-
- pigServer.store("B", "tmp_testCustomPartitionerGroups");
-
- new File("tmp_testCustomPartitionerGroups").mkdir();
-
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00000", "tmp_testCustomPartitionerGroups/part-r-00000");
- BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00000"));
- String line = null;
- while((line = reader.readLine()) != null) {
- Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
- }
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerGroups/part-r-00001", "tmp_testCustomPartitionerGroups/part-r-00001");
- reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerGroups/part-r-00001"));
- line = null;
- int count=0;
- while((line = reader.readLine()) != null) {
- //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
- count++;
- }
- Assert.assertEquals(4, count);
- Util.deleteDirectory(new File("tmp_testCustomPartitionerGroups"));
- Util.deleteFile(cluster, "tmp_testCustomPartitionerGroups");
- Util.deleteFile(cluster, "table_testCustomPartitionerGroups");
- }
-
- // See PIG-3385
- @Test
- public void testCustomPartitionerDistinct() throws Exception{
- String[] input = {
- "1\t1",
- "2\t1",
- "1\t1",
- "3\t1",
- "4\t1",
- };
- Util.createInputFile(cluster, "table_testCustomPartitionerDistinct", input);
-
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerDistinct' as (a0:int, a1:int);");
- pigServer.registerQuery("B = distinct A PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner3 parallel 2;");
- pigServer.store("B", "tmp_testCustomPartitionerDistinct");
-
- new File("tmp_testCustomPartitionerDistinct").mkdir();
-
- // SimpleCustomPartitioner3 simply partition all inputs to *second* reducer
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00000", "tmp_testCustomPartitionerDistinct/part-r-00000");
- BufferedReader reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00000"));
- String line = null;
- while((line = reader.readLine()) != null) {
- Assert.fail("Partition 0 should be empty. Most likely Custom Partitioner was not used.");
- }
- Util.copyFromClusterToLocal(cluster, "tmp_testCustomPartitionerDistinct/part-r-00001", "tmp_testCustomPartitionerDistinct/part-r-00001");
- reader = new BufferedReader(new FileReader("tmp_testCustomPartitionerDistinct/part-r-00001"));
- line = null;
- int count=0;
- while((line = reader.readLine()) != null) {
- //all outputs should come to partion 1 (with SimpleCustomPartitioner3)
- count++;
- }
- Assert.assertEquals(4, count);
- Util.deleteDirectory(new File("tmp_testCustomPartitionerDistinct"));
- Util.deleteFile(cluster, "tmp_testCustomPartitionerDistinct");
- Util.deleteFile(cluster, "table_testCustomPartitionerDistinct");
- }
-
- // See PIG-282
- @Test
- public void testCustomPartitionerCross() throws Exception{
- String[] input = {
- "1\t3",
- "1\t2",
- };
-
- Util.createInputFile(cluster, "table_testCustomPartitionerCross", input);
- pigServer.registerQuery("A = LOAD 'table_testCustomPartitionerCross' as (a0:int, a1:int);");
- pigServer.registerQuery("B = ORDER A by $0;");
- pigServer.registerQuery("C = cross A , B PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner;");
- Iterator<Tuple> iter = pigServer.openIterator("C");
- Tuple t;
-
- Collection<String> results = new HashSet<String>();
- results.add("(1,3,1,2)");
- results.add("(1,3,1,3)");
- results.add("(1,2,1,2)");
- results.add("(1,2,1,3)");
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(results.contains(t.toString()));
-
- Util.deleteFile(cluster, "table_testCustomPartitionerCross");
- }
-
// See PIG-972
@Test
public void testDescribeNestedAlias() throws Exception{
Modified: pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestPigServer.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestPigServer.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestPigServer.java Thu Jan 23 01:06:21 2014
@@ -535,16 +535,16 @@ public class TestPigServer {
pig.registerQuery("c = group b by site;");
pig.registerQuery("d = foreach c generate FLATTEN($1);");
pig.registerQuery("e = group d by $2;");
-
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
pig.explain("e", "xml", true, false, ps, ps, null, null);
-
+
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(bais);
-
+
//Verify Logical and Physical Plans aren't supported.
NodeList logicalPlan = doc.getElementsByTagName("logicalPlan");
assertEquals(1, logicalPlan.getLength());
@@ -552,20 +552,20 @@ public class TestPigServer {
NodeList physicalPlan = doc.getElementsByTagName("physicalPlan");
assertEquals(1, physicalPlan.getLength());
assertTrue(physicalPlan.item(0).getTextContent().contains("Not Supported"));
-
+
//Verify we have two loads and one is temporary
NodeList loads = doc.getElementsByTagName("POLoad");
assertEquals(2, loads.getLength());
-
+
boolean sawTempLoad = false;
boolean sawNonTempLoad = false;
for (int i = 0; i < loads.getLength(); i++) {
Boolean isTempLoad = null;
boolean hasAlias = false;
-
+
Node poLoad = loads.item(i);
NodeList children = poLoad.getChildNodes();
-
+
for (int j = 0; j < children.getLength(); j++) {
Node child = children.item(j);
if (child.getNodeName().equals("alias")) {
@@ -579,7 +579,7 @@ public class TestPigServer {
}
}
}
-
+
if (isTempLoad == null) {
fail("POLoad elements should have isTmpLoad child node.");
} else if (isTempLoad && hasAlias) {
@@ -587,11 +587,11 @@ public class TestPigServer {
} else if (!isTempLoad && !hasAlias) {
fail("Non temporary loads should be associated with alias.");
}
-
+
sawTempLoad = sawTempLoad || isTempLoad;
sawNonTempLoad = sawNonTempLoad || !isTempLoad;
}
-
+
assertTrue(sawTempLoad && sawNonTempLoad);
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java Thu Jan 23 01:06:21 2014
@@ -27,45 +27,49 @@ import java.io.PrintStream;
import java.util.Iterator;
import java.util.List;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.VisitorException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-public class TestSecondarySort {
- static MiniCluster cluster = MiniCluster.buildCluster();
- private PigServer pigServer;
-
- static PigContext pc;
- static {
- pc = new PigContext(ExecType.MAPREDUCE, MiniCluster.buildCluster().getProperties());
- try {
- pc.connect();
- } catch (ExecException e) {
- throw new RuntimeException(e);
- }
- }
+public abstract class TestSecondarySort {
+ protected static MiniGenericCluster cluster = null;
+ protected PigServer pigServer;
+ protected static PigContext pc;
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
+ cluster = null;
}
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ if (cluster == null) {
+ cluster = getCluster();
+ pc = new PigContext(cluster.getExecType(), cluster.getProperties());
+ try {
+ pc.connect();
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ pigServer = new PigServer(pc);
}
+ public abstract SecondaryKeyOptimizer visitSecondaryKeyOptimizer(
+ String query) throws Exception, VisitorException;
+
+ public abstract MiniGenericCluster getCluster();
+
@Test
public void testDistinctOptimization1() throws Exception {
// Distinct on one entire input
@@ -74,15 +78,11 @@ public class TestSecondarySort {
"C = foreach B { D = distinct A; generate group, D;};"+
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(0, so.getNumSortRemoved());
- assertEquals(1, so.getDistinctChanged());
+ assertEquals(1, so.getNumDistinctChanged());
}
@Test
@@ -93,15 +93,11 @@ public class TestSecondarySort {
"C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -112,15 +108,11 @@ public class TestSecondarySort {
"C = foreach B { D = limit A 10; E = order D by $0; generate group, E;};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
- assertEquals(0, so.getNumMRUseSecondaryKey());
+ assertEquals(0, so.getNumUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -131,15 +123,11 @@ public class TestSecondarySort {
"C = foreach B { D = limit A 10; E = order D by $1; F = order E by $0; generate group, F;};"+
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(2, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -150,15 +138,11 @@ public class TestSecondarySort {
"C = foreach B { D = limit A 10; E = order D by $0, $1, $2; generate group, E;};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -168,15 +152,11 @@ public class TestSecondarySort {
"B = group A by $0;" +
"C = foreach B { D = limit A 10; E = order D by $1; F = order E by $2; generate group, F;};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -187,15 +167,11 @@ public class TestSecondarySort {
"C = foreach B { D = order A by $0 desc; generate group, D;};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -206,15 +182,11 @@ public class TestSecondarySort {
"C = foreach B { D = order A by $0, $1 desc; generate group, D;};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(1, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
// See PIG-1193
@@ -226,15 +198,11 @@ public class TestSecondarySort {
"C = foreach B { D = order A by $0 desc; generate DIFF(D, D);};" +
"store C into 'output';");
- PhysicalPlan pp = Util.buildPp(pigServer, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
- so.visit();
-
- assertEquals(1, so.getNumMRUseSecondaryKey());
+ SecondaryKeyOptimizer so = visitSecondaryKeyOptimizer(query);
+ assertEquals(1, so.getNumUseSecondaryKey());
assertEquals(2, so.getNumSortRemoved());
- assertEquals(0, so.getDistinctChanged());
+ assertEquals(0, so.getNumDistinctChanged());
}
@Test
@@ -403,25 +371,78 @@ public class TestSecondarySort {
@Test
public void testNestedSortMultiQueryEndToEnd1() throws Exception {
- pigServer.setBatchOn();
- Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd",
- "testNestedSortMultiQueryEndToEnd1-input.txt");
- pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd1-input.txt'"
- + " using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- pigServer.registerQuery("b = group a by uname parallel 2;");
- pigServer.registerQuery("c = group a by gid parallel 2;");
- pigServer.registerQuery("d = foreach b generate SUM(a.gid);");
- pigServer.registerQuery("e = foreach c { f = order a by uid; generate group, f; };");
- pigServer.registerQuery("store d into '/tmp/output1';");
- pigServer.registerQuery("store e into '/tmp/output2';");
-
- List<ExecJob> jobs = pigServer.executeBatch();
- for (ExecJob job : jobs) {
- assertEquals(ExecJob.JOB_STATUS.COMPLETED, job.getStatus());
+ try {
+ pigServer.setBatchOn();
+ Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd",
+ "testNestedSortMultiQueryEndToEnd1-input.txt");
+ pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd1-input.txt'"
+ + " using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ pigServer.registerQuery("b = group a by uname parallel 2;");
+ pigServer.registerQuery("c = group a by gid parallel 2;");
+ pigServer.registerQuery("d = foreach b generate SUM(a.gid);");
+ pigServer.registerQuery("e = foreach c { f = order a by uid; generate group, f; };");
+ pigServer.registerQuery("store d into '/tmp/output1';");
+ pigServer.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = pigServer.executeBatch();
+ for (ExecJob job : jobs) {
+ assertEquals(ExecJob.JOB_STATUS.COMPLETED, job.getStatus());
+ }
+ } finally {
+ FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
+ FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
+ Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd1-input.txt");
+ }
+ }
+
+ @Test
+ public void testNestedSortMultiQueryEndToEnd2() throws Exception {
+ File input = Util.createTempFileDelOnExit("test", "txt");
+ PrintStream ps1 = new PrintStream(new FileOutputStream(input));
+ ps1.println("2\t2\t4");
+ ps1.println("2\t3\t4");
+ ps1.println("1\t2\t3");
+ ps1.println("1\t3\t4");
+ ps1.println("1\t2\t4");
+ ps1.println("1\t2\t5");
+ ps1.close();
+ Util.copyFromLocalToCluster(cluster, input.getCanonicalPath(), "testNestedSortMultiQueryEndToEnd2-input.txt");
+
+ try {
+ pigServer.setBatchOn();
+ pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd2-input.txt' as (a0, a1, a2);");
+ pigServer.registerQuery("b = group a by a0 parallel 2;");
+ pigServer.registerQuery("c = group a by a1 parallel 2;");
+ pigServer.registerQuery("d = foreach b generate group, SUM(a.a2);");
+ pigServer.registerQuery("e = foreach c { f = order a by a0,a2; generate group, f; };");
+ pigServer.registerQuery("store d into '/tmp/output1';");
+ pigServer.registerQuery("store e into '/tmp/output2';");
+
+ List<ExecJob> jobs = pigServer.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+ pigServer.registerQuery("D = load '/tmp/output1';");
+ pigServer.registerQuery("E = load '/tmp/output2';");
+ pigServer.executeBatch();
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ Schema s = pigServer.dumpSchema("D");
+ String[] expectedD = { "(1,16.0)", "(2,8.0)" };
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedD,
+ org.apache.pig.newplan.logical.Util.translateSchema(s));
+
+ iter = pigServer.openIterator("E");
+ s = pigServer.dumpSchema("E");
+ String[] expectedE = { "(2,{(1,2,3),(1,2,4),(1,2,5),(2,2,4)})", "(3,{(1,3,4),(2,3,4)})" };
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedE,
+ org.apache.pig.newplan.logical.Util.translateSchema(s));
+ } finally {
+ FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
+ FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
+ Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd2-input.txt");
}
- FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
- FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
- Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd1-input.txt");
+
}
// See PIG-1978
Added: pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java (added)
+++ pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizerMR;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TestSecondarySortMR extends TestSecondarySort {
+
+ public TestSecondarySortMR() {
+ super();
+ }
+
+ @Override
+ public MiniGenericCluster getCluster() {
+ return MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_MR);
+ }
+
+ @Override
+ public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query)
+ throws Exception, VisitorException {
+ PhysicalPlan pp = Util.buildPp(pigServer, query);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizerMR so = new SecondaryKeyOptimizerMR(mrPlan);
+ so.visit();
+ return so;
+ }
+
+}
+
Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Thu Jan 23 01:06:21 2014
@@ -56,11 +56,11 @@ public class TestSkewedJoin {
private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "5");
pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", "0.01");
createFiles();