You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC
svn commit: r1598702 [14/23] - in /pig/trunk: ./ ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/ba...
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Fri May 30 19:07:23 2014
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnChainInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ColumnInfo;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SortKeyInfo;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerUtil {
+ private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
+
+ private SecondaryKeyOptimizerUtil() {
+
+ }
+
+ private static class POToChange {
+ POToChange(PhysicalOperator oper, PhysicalPlan plan, POForEach forEach) {
+ this.oper = oper;
+ this.plan = plan;
+ this.forEach = forEach;
+ }
+
+ PhysicalOperator oper;
+
+ PhysicalPlan plan;
+
+ POForEach forEach;
+ }
+
+ public static class SecondaryKeyOptimizerInfo {
+
+ private int numSortRemoved = 0;
+ private int numDistinctChanged = 0;
+ private int numUseSecondaryKey = 0;
+ private boolean useSecondaryKey = false;
+ private boolean[] secondarySortOrder;
+
+ public int getNumSortRemoved() {
+ return numSortRemoved;
+ }
+
+ public void incrementNumSortRemoved() {
+ this.numSortRemoved++;
+ }
+
+ public int getNumDistinctChanged() {
+ return numDistinctChanged;
+ }
+
+ public void incrementNumDistinctChanged() {
+ this.numDistinctChanged++;
+ }
+
+ public int getNumUseSecondaryKey() {
+ return numUseSecondaryKey;
+ }
+
+ public void incrementNumUseSecondaryKey() {
+ this.numUseSecondaryKey++;
+ }
+
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public boolean[] getSecondarySortOrder() {
+ return secondarySortOrder;
+ }
+
+ public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+ this.secondarySortOrder = secondarySortOrder;
+ }
+
+ }
+
+ // Build sort key structure from POLocalRearrange
+ private static SortKeyInfo getSortKeyInfo(POLocalRearrange rearrange) throws ExecException {
+ SortKeyInfo result = new SortKeyInfo();
+ List<PhysicalPlan> plans = rearrange.getPlans();
+ nextPlan: for (int i = 0; i < plans.size(); i++) {
+ PhysicalPlan plan = plans.get(i);
+ ColumnChainInfo columnChainInfo = new ColumnChainInfo();
+ if (plan.getRoots() == null) {
+ log.debug("POLocalRearrange plan is null");
+ return null;
+ } else if (plan.getRoots().size() != 1) {
+ // POLocalRearrange plan contains more than 1 root.
+ // Probably there is an Expression operator in the local
+ // rearrangement plan, skip this plan
+ continue nextPlan;
+ } else {
+ List<Integer> columns = new ArrayList<Integer>();
+ columns
+ .add(rearrange.getIndex()
+ & PigNullableWritable.idxSpace);
+
+ // The first item inside columnChainInfo is set to type Tuple.
+ // This value is not actually in use, but it intends to match
+ // the type of POProject in reduce side
+ columnChainInfo.insert(columns, DataType.TUPLE);
+
+ PhysicalOperator node = plan.getRoots().get(0);
+ while (node != null) {
+ if (node instanceof POProject) {
+ POProject project = (POProject) node;
+ if(project.isProjectToEnd()){
+ columnChainInfo.insert(project.getStartCol(),
+ project.getResultType());
+ }else {
+ columnChainInfo.insert(
+ project.getColumns(), project.getResultType());
+ }
+
+ if (plan.getSuccessors(node) == null)
+ node = null;
+ else if (plan.getSuccessors(node).size() != 1) {
+ log.debug(node + " have more than 1 successor");
+ node = null;
+ } else
+ node = plan.getSuccessors(node).get(0);
+ } else
+ // constant, UDF, we will pass
+ continue nextPlan;
+ }
+ }
+ // Let's assume all main key is sorted ascendant, we can further
+ // optimize it to match one of the nested sort/distinct key, because we do not
+ // really care about how cogroup key are sorted; But it may not be the case
+ // if sometime we switch all the comparator to byte comparator, so just
+ // leave it as it is for now
+ result.insertColumnChainInfo(i, columnChainInfo, true);
+ }
+ return result;
+ }
+
+ public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+ log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
+ SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo();
+ List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
+ SortKeyInfo secondarySortKeyInfo = null;
+ List<POToChange> sortsToRemove = null;
+ List<POToChange> distinctsToChange = null;
+
+
+ List<PhysicalOperator> mapLeaves = mapPlan.getLeaves();
+ if (mapLeaves == null || mapLeaves.size() != 1) {
+ log.debug("Expected map to have single leaf! Skip secondary key optimizing");
+ return null;
+ }
+ PhysicalOperator mapLeaf = mapLeaves.get(0);
+
+ // Figure out the main key of the map-reduce job from POLocalRearrange
+ try {
+ if (mapLeaf instanceof POLocalRearrange) {
+ SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
+ if (sortKeyInfo == null) {
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ return null;
+ }
+ sortKeyInfos.add(sortKeyInfo);
+ } else if (mapLeaf instanceof POUnion) {
+ List<PhysicalOperator> preds = mapPlan
+ .getPredecessors(mapLeaf);
+ for (PhysicalOperator pred : preds) {
+ if (pred instanceof POLocalRearrange) {
+ SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) pred);
+ if (sortKeyInfo == null) {
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ return null;
+ }
+ sortKeyInfos.add(sortKeyInfo);
+ }
+ }
+ } else {
+ log.debug("Cannot find POLocalRearrange or POUnion in map leaf, skip secondary key optimizing");
+ return null;
+ }
+ } catch (ExecException e) {
+ log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+ return null;
+ }
+
+ if (reducePlan.isEmpty()) {
+ log.debug("Reduce plan is empty, skip secondary key optimizing");
+ return null;
+ }
+
+ List<PhysicalOperator> reduceRoots = reducePlan.getRoots();
+ if (reduceRoots.size() != 1) {
+ log.debug("Expected reduce to have single root, skip secondary key optimizing");
+ return null;
+ }
+
+ PhysicalOperator root = reduceRoots.get(0);
+ if (!(root instanceof POPackage)) {
+ log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+ return null;
+ }
+
+ // visit the POForEach of the reduce plan. We can have Limit and Filter
+ // in the middle
+ PhysicalOperator currentNode = root;
+ POForEach foreach = null;
+ while (currentNode != null) {
+ if (currentNode instanceof POPackage
+ && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
+ || currentNode instanceof POFilter
+ || currentNode instanceof POLimit) {
+ List<PhysicalOperator> succs = reducePlan
+ .getSuccessors(currentNode);
+ if (succs == null) // We didn't find POForEach
+ return null;
+ if (succs.size() != 1) {
+ log.debug("See multiple output for " + currentNode
+ + " in reduce plan, skip secondary key optimizing");
+ return null;
+ }
+ currentNode = succs.get(0);
+ } else if (currentNode instanceof POForEach) {
+ foreach = (POForEach) currentNode;
+ break;
+ } else { // Skip optimization
+ return null;
+ }
+ }
+
+ // We do not find a foreach (we shall not come here, a trick to fool findbugs)
+ if (foreach==null)
+ return null;
+
+ sortsToRemove = new ArrayList<POToChange>();
+ distinctsToChange = new ArrayList<POToChange>();
+
+ for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
+ // visit inner plans to figure out the sort order for distinct /
+ // sort
+ SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
+ innerPlan, sortKeyInfos, secondarySortKeyInfo);
+ try {
+ innerPlanDiscover.process();
+ } catch (FrontendException e) {
+ int errorCode = 2213;
+ throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+ }
+ secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+ if (innerPlanDiscover.getSortsToRemove() != null) {
+ for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
+ sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
+ }
+ }
+ if (innerPlanDiscover.getDistinctsToChange() != null) {
+ for (PODistinct distinct : innerPlanDiscover
+ .getDistinctsToChange()) {
+ distinctsToChange.add(new POToChange(distinct, innerPlan,
+ foreach));
+ }
+ }
+ }
+
+ try {
+ // Change PODistinct to use POSortedDistinct, which assume the input
+ // data is sorted
+ for (POToChange distinctToChange : distinctsToChange) {
+ secKeyOptimizerInfo.incrementNumDistinctChanged();
+ PODistinct oldDistinct = (PODistinct) distinctToChange.oper;
+ String scope = oldDistinct.getOperatorKey().scope;
+ POSortedDistinct newDistinct = new POSortedDistinct(
+ new OperatorKey(scope, NodeIdGenerator.getGenerator()
+ .getNextNodeId(scope)), oldDistinct
+ .getRequestedParallelism(), oldDistinct
+ .getInputs());
+ newDistinct.setInputs(oldDistinct.getInputs());
+ newDistinct.setResultType(oldDistinct.getResultType());
+ distinctToChange.plan.replace(oldDistinct, newDistinct);
+ distinctToChange.forEach.getLeaves();
+ }
+ // Removed POSort, if the successor require a databag, we need to
+ // add a PORelationToExprProject
+ // to convert tuples into databag
+ for (POToChange sortToRemove : sortsToRemove) {
+ secKeyOptimizerInfo.incrementNumSortRemoved();
+ POSort oldSort = (POSort) sortToRemove.oper;
+ String scope = oldSort.getOperatorKey().scope;
+ List<PhysicalOperator> preds = sortToRemove.plan
+ .getPredecessors(sortToRemove.oper);
+ List<PhysicalOperator> succs = sortToRemove.plan
+ .getSuccessors(sortToRemove.oper);
+ POProject project = null;
+ if ((preds == null
+ || preds.get(0).getResultType() != DataType.BAG
+ && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+ && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+ {
+ project = new PORelationToExprProject(new OperatorKey(
+ scope, NodeIdGenerator.getGenerator()
+ .getNextNodeId(scope)), oldSort
+ .getRequestedParallelism());
+ project.setInputs(oldSort.getInputs());
+ project.setResultType(DataType.BAG);
+ project.setStar(true);
+ }
+ if (project == null)
+ sortToRemove.plan.removeAndReconnect(sortToRemove.oper);
+ else
+ sortToRemove.plan.replace(oldSort, project);
+ sortToRemove.forEach.getLeaves();
+ }
+ } catch (PlanException e) {
+ int errorCode = 2202;
+ throw new VisitorException(
+ "Error change distinct/sort to use secondary key optimizer",
+ errorCode, e);
+ }
+ if (secondarySortKeyInfo != null) {
+ // Adjust POLocalRearrange, POPackage, MapReduceOper to use the
+ // secondary key
+ secKeyOptimizerInfo.incrementNumUseSecondaryKey();
+ secKeyOptimizerInfo.setUseSecondaryKey(true);
+ secKeyOptimizerInfo.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+ int indexOfRearrangeToChange = -1;
+ for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+ .getColumnChains()) {
+ ColumnInfo currentColumn = columnChainInfo.getColumnInfos()
+ .get(0);
+ int index = currentColumn.getColumns().get(0);
+ if (indexOfRearrangeToChange == -1)
+ indexOfRearrangeToChange = index;
+ else if (indexOfRearrangeToChange != index) {
+ int errorCode = 2203;
+ throw new VisitorException("Sort on columns from different inputs.", errorCode);
+ }
+ }
+ if (mapLeaf instanceof POLocalRearrange) {
+ ((POLocalRearrange) mapLeaf).setUseSecondaryKey(true);
+ setSecondaryPlan(mapPlan, (POLocalRearrange) mapLeaf,
+ secondarySortKeyInfo);
+ } else if (mapLeaf instanceof POUnion) {
+ List<PhysicalOperator> preds = mapPlan
+ .getPredecessors(mapLeaf);
+ boolean found = false;
+ for (PhysicalOperator pred : preds) {
+ POLocalRearrange rearrange = (POLocalRearrange) pred;
+ rearrange.setUseSecondaryKey(true);
+ if (rearrange.getIndex() == indexOfRearrangeToChange) {
+ // Try to find the POLocalRearrange for the secondary key
+ found = true;
+ setSecondaryPlan(mapPlan, rearrange, secondarySortKeyInfo);
+ }
+ }
+ if (!found)
+ {
+ int errorCode = 2214;
+ throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
+ }
+ }
+ POPackage pack = (POPackage) root;
+ pack.getPkgr().setUseSecondaryKey(true);
+ }
+ return secKeyOptimizerInfo;
+ }
+
+ private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange,
+ SortKeyInfo secondarySortKeyInfo) throws VisitorException {
+ // Put plan to project secondary key to the POLocalRearrange
+ try {
+ String scope = rearrange.getOperatorKey().scope;
+ List<PhysicalPlan> secondaryPlanList = new ArrayList<PhysicalPlan>();
+ for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
+ .getColumnChains()) {
+ PhysicalPlan secondaryPlan = new PhysicalPlan();
+ for (int i = 1; i < columnChainInfo.size(); i++) {
+ // The first item in columnChainInfo indicate the index of
+ // input, we have addressed
+ // already before we come here
+ ColumnInfo columnInfo = columnChainInfo.getColumnInfo(i);
+ POProject project = new POProject(
+ new OperatorKey(scope, NodeIdGenerator
+ .getGenerator().getNextNodeId(scope)),
+ rearrange.getRequestedParallelism());
+ if(columnInfo.isRangeProject())
+ project.setProjectToEnd(columnInfo.getStartCol());
+ else
+ project
+ .setColumns((ArrayList<Integer>) columnInfo.getColumns());
+ project.setResultType(columnInfo.getResultType());
+ secondaryPlan.addAsLeaf(project);
+ }
+ if (secondaryPlan.isEmpty()) { // If secondary key sort on the
+ // input as a whole
+ POProject project = new POProject(
+ new OperatorKey(scope, NodeIdGenerator
+ .getGenerator().getNextNodeId(scope)),
+ rearrange.getRequestedParallelism());
+ project.setStar(true);
+ secondaryPlan.addAsLeaf(project);
+ }
+ secondaryPlanList.add(secondaryPlan);
+ }
+ rearrange.setSecondaryPlans(secondaryPlanList);
+ } catch (PlanException e) {
+ int errorCode = 2204;
+ throw new VisitorException("Error setting secondary key plan",
+ errorCode, e);
+ }
+ }
+
+ // Find eligible sort and distinct physical operators from the reduce plan.
+ // SecondaryKeyChecker will check for sort/distinct keys (for distinct, it
+ // is
+ // always the entire bag), if it is the same with the main key, then we can
+ // just
+ // remove it. If it is not, then we can have 1 secondary sort key, put the
+ // first
+ // such sort/distinct key as the secondary sort key. For subsequent
+ // sort/distinct,
+ // we cannot do any secondary key optimization because we only have 1
+ // secondary
+ // sort key.
+ private static class SecondaryKeyDiscover {
+ PhysicalPlan mPlan;
+
+ List<POSort> sortsToRemove = new ArrayList<POSort>();
+
+ List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
+
+ List<SortKeyInfo> sortKeyInfos;
+
+ SortKeyInfo secondarySortKeyInfo;
+
+ ColumnChainInfo columnChainInfo = null;
+
+ // PhysicalPlan here is foreach inner plan
+ SecondaryKeyDiscover(PhysicalPlan plan,
+ List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
+ this.mPlan = plan;
+ this.sortKeyInfos = sortKeyInfos;
+ this.secondarySortKeyInfo = secondarySortKeyInfo;
+ }
+
+ public void process() throws FrontendException
+ {
+ List<PhysicalOperator> roots = mPlan.getRoots();
+ for (PhysicalOperator root : roots) {
+ columnChainInfo = new ColumnChainInfo();
+ processRoot(root);
+ }
+ }
+
+ public void processRoot(PhysicalOperator root) throws FrontendException {
+ PhysicalOperator currentNode = root;
+ while (currentNode!=null) {
+ boolean sawInvalidPhysicalOper = false;
+ if (currentNode instanceof PODistinct)
+ sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+ else if (currentNode instanceof POSort)
+ sawInvalidPhysicalOper = processSort((POSort)currentNode);
+ else if (currentNode instanceof POProject)
+ sawInvalidPhysicalOper = processProject((POProject)currentNode);
+ else if (currentNode instanceof POUserFunc ||
+ currentNode instanceof POUnion ||
+ // We don't process foreach, since foreach is too complex to get right
+ currentNode instanceof POForEach)
+ break;
+
+ if (sawInvalidPhysicalOper)
+ break;
+
+ List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+ if (succs==null)
+ currentNode = null;
+ else {
+ if (succs.size()>1) {
+ int errorCode = 2215;
+ throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+ errorCode);
+ }
+ currentNode = succs.get(0);
+ }
+ }
+ }
+
+ // We see PODistinct, check which key it is using
+ public boolean processDistinct(PODistinct distinct) throws FrontendException {
+ SortKeyInfo keyInfos = new SortKeyInfo();
+ try {
+ keyInfos.insertColumnChainInfo(0,
+ (ColumnChainInfo) columnChainInfo.clone(), true);
+ } catch (CloneNotSupportedException e) { // We implement Clonable,
+ // impossible to get here
+ }
+
+ // if it is part of main key
+ for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+ if (sortKeyInfo.moreSpecificThan(keyInfos)) {
+ distinctsToChange.add(distinct);
+ return false;
+ }
+ }
+
+ // if it is part of secondary key
+ if (secondarySortKeyInfo != null
+ && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
+ distinctsToChange.add(distinct);
+ return false;
+ }
+
+ // Now set the secondary key
+ if (secondarySortKeyInfo == null) {
+ distinctsToChange.add(distinct);
+ secondarySortKeyInfo = keyInfos;
+ }
+ return false;
+ }
+
+ // Accumulate column info
+ public boolean processProject(POProject project) throws FrontendException {
+ columnChainInfo.insertInReduce(project);
+ return false;
+ }
+
+ // We see POSort, check which key it is using
+ public boolean processSort(POSort sort) throws FrontendException{
+ SortKeyInfo keyInfo = new SortKeyInfo();
+ for (int i = 0; i < sort.getSortPlans().size(); i++) {
+ PhysicalPlan sortPlan = sort.getSortPlans().get(i);
+ ColumnChainInfo sortChainInfo = null;
+ try {
+ sortChainInfo = (ColumnChainInfo) columnChainInfo.clone();
+ } catch (CloneNotSupportedException e) { // We implement
+ // Clonable, impossible
+ // to get here
+ }
+ boolean r = false;
+ try {
+ r = collectColumnChain(sortPlan, sortChainInfo);
+ } catch (PlanException e) {
+ int errorCode = 2206;
+ throw new FrontendException("Error visiting POSort inner plan",
+ errorCode, e);
+ }
+ if (r==true) // if we saw physical operator other than project in sort plan
+ {
+ return true;
+ }
+ keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
+ .getMAscCols().get(i));
+ }
+ // if it is part of main key
+ for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
+ if (sortKeyInfo.moreSpecificThan(keyInfo)) {
+ sortsToRemove.add(sort);
+ return false;
+ }
+ }
+ // if it is part of secondary key
+ if (secondarySortKeyInfo != null
+ && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
+ sortsToRemove.add(sort);
+ return false;
+ }
+
+ // Now set the secondary key
+ if (secondarySortKeyInfo == null) {
+ sortsToRemove.add(sort);
+ secondarySortKeyInfo = keyInfo;
+ }
+ return false;
+ }
+
+ public List<POSort> getSortsToRemove() {
+ return sortsToRemove;
+ }
+
+ public List<PODistinct> getDistinctsToChange() {
+ return distinctsToChange;
+ }
+
+ public SortKeyInfo getSecondarySortKeyInfo() {
+ return secondarySortKeyInfo;
+ }
+ }
+
+ // Return true if we saw physical operators other than project in the plan
+ static private boolean collectColumnChain(PhysicalPlan plan,
+ ColumnChainInfo columnChainInfo) throws PlanException {
+ if (plan.getRoots().size() != 1) {
+ return true;
+ }
+
+ PhysicalOperator currentNode = plan.getRoots().get(0);
+
+ while (currentNode != null) {
+ if (currentNode instanceof POProject) {
+ POProject project = (POProject) currentNode;
+ columnChainInfo.insertInReduce(project);
+ } else {
+ return true;
+ }
+ List<PhysicalOperator> succs = plan.getSuccessors(currentNode);
+ if (succs == null)
+ break;
+ if (succs.size() != 1) {
+ int errorCode = 2208;
+ throw new PlanException(
+ "Exception visiting foreach inner plan", errorCode);
+ }
+ currentNode = succs.get(0);
+ }
+ return false;
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Fri May 30 19:07:23 2014
@@ -97,6 +97,7 @@ public class HadoopExecutableManager ext
scriptLogDir = job.get("pig.streaming.log.dir", "_logs");
// Save the taskid
+ // TODO Get an equivalent property in Tez mode (currently this returns null)
taskId = job.get("mapred.task.id");
}
@@ -129,6 +130,9 @@ public class HadoopExecutableManager ext
super.close();
// Copy the secondary outputs of the task to HDFS
+ if (this.scriptOutputDir==null) {
+ return;
+ }
Path scriptOutputDir = new Path(this.scriptOutputDir);
FileSystem fs = scriptOutputDir.getFileSystem(job);
List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
@@ -171,7 +175,7 @@ public class HadoopExecutableManager ext
* HDFS, <code>false</code> otherwise
*/
private boolean writeErrorToHDFS(int limit, String taskId) {
- if (command.getPersistStderr()) {
+ if (command.getPersistStderr() && taskId != null) {
int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
return tipId < command.getLogFilesLimit();
}
Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Fri May 30 19:07:23 2014
@@ -688,10 +688,6 @@ public class BinInterSedes implements In
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
try {
mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
mSecondaryAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.secondarySortOrder"));
Modified: pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Fri May 30 19:07:23 2014
@@ -51,7 +51,7 @@ public abstract class DefaultAbstractBag
// If we grow past 100K, may be worthwhile to register.
private static final int SPILL_REGISTER_THRESHOLD = 100 * 1024;
- private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+ private static PigLogger pigLogger;
private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
// Container that holds the tuples. Actual object instantiated by
@@ -85,7 +85,7 @@ public abstract class DefaultAbstractBag
/**
- * Sample every SPILL_SAMPLE_FREQUENCYth tuple
+ * Sample every SPILL_SAMPLE_FREQUENCYth tuple
* until we reach a max of SPILL_SAMPLE_SIZE
* to get an estimate of the tuple sizes.
*/
Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Fri May 30 19:07:23 2014
@@ -230,13 +230,8 @@ public class DefaultTuple extends Abstra
@Override
public void setConf(Configuration conf) {
- if (!(conf instanceof JobConf)) {
- mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
- return;
- }
- JobConf jconf = (JobConf) conf;
try {
- mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+ mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
} catch (IOException ioe) {
mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
throw new RuntimeException(ioe);
Modified: pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/ReadOnceBag.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/ReadOnceBag.java Fri May 30 19:07:23 2014
@@ -38,15 +38,15 @@ public class ReadOnceBag implements Data
// The Packager that created this
protected Packager pkgr;
-
+
//The iterator of Tuples. Marked transient because we will never serialize this.
protected transient Iterator<NullableTuple> tupIter;
-
+
// The key being worked on
protected PigNullableWritable keyWritable;
/**
- *
+ *
*/
private static final long serialVersionUID = 2L;
@@ -75,7 +75,7 @@ public class ReadOnceBag implements Data
/* (non-Javadoc)
* @see org.apache.pig.impl.util.Spillable#spill()
-
+
*/
@Override
public long spill() {
@@ -87,7 +87,7 @@ public class ReadOnceBag implements Data
*/
@Override
public void add(Tuple t) {
- throw new RuntimeException("ReadOnceBag does not support add operation");
+ throw new RuntimeException("ReadOnceBag does not support add operation");
}
/* (non-Javadoc)
@@ -131,7 +131,7 @@ public class ReadOnceBag implements Data
}
/* (non-Javadoc)
- * @see org.apache.pig.data.DataBag#markStale(boolean)
+ * @see org.apache.pig.data.DataBag#markStale(boolean)
*/
@Override
public void markStale(boolean stale) {
@@ -166,7 +166,7 @@ public class ReadOnceBag implements Data
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
- * This has to be defined since DataBag implements
+ * This has to be defined since DataBag implements
* Comparable although, in this case we cannot really compare.
*/
@Override
@@ -202,7 +202,7 @@ public class ReadOnceBag implements Data
@Override
public int hashCode() {
- int hash = 7;
+ int hash = 7;
if (pkgr.getKeyTuple())
{
hash = hash * 31 + pkgr.getKeyAsTuple().hashCode();
@@ -236,18 +236,18 @@ public class ReadOnceBag implements Data
ret = pkgr.getValueTuple(keyWritable, ntup, index);
} catch (ExecException e)
{
- throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
+ throw new RuntimeException("ReadOnceBag failed to get value tuple : "+e.toString());
}
return ret;
}
-
+
/* (non-Javadoc)
* @see java.util.Iterator#remove()
*/
@Override
public void remove() {
- throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
+ throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
}
- }
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Fri May 30 19:07:23 2014
@@ -148,7 +148,7 @@ public class PigContext implements Seria
private static ThreadLocal<ArrayList<String>> packageImportList =
new ThreadLocal<ArrayList<String>>();
- private static ThreadLocal<Map<String,Class<?>>> classCache =
+ private static ThreadLocal<Map<String,Class<?>>> classCache =
new ThreadLocal<Map<String,Class<?>>>();
private Properties log4jProperties = new Properties();
@@ -242,14 +242,14 @@ public class PigContext implements Seria
this(ExecType.MAPREDUCE, new Properties());
}
- public PigContext(Configuration conf) throws PigException {
- this(ConfigurationUtil.toProperties(conf));
- }
-
- public PigContext(Properties properties) throws PigException {
- this(ExecTypeProvider.selectExecType(properties), properties);
- }
-
+ public PigContext(Configuration conf) throws PigException {
+ this(ConfigurationUtil.toProperties(conf));
+ }
+
+ public PigContext(Properties properties) throws PigException {
+ this(ExecTypeProvider.selectExecType(properties), properties);
+ }
+
public PigContext(ExecType execType, Configuration conf) {
this(execType, ConfigurationUtil.toProperties(conf));
}
@@ -276,7 +276,7 @@ public class PigContext implements Seria
skippedShipPaths.add("/sbin");
skippedShipPaths.add("/usr/sbin");
skippedShipPaths.add("/usr/local/sbin");
-
+
macros = new HashMap<String, Tree>();
scriptingUDFs = new HashMap<String, String>();
@@ -376,10 +376,10 @@ public class PigContext implements Seria
}
/**
- * Adds the specified path to the predeployed jars list. These jars will
+ * Adds the specified path to the predeployed jars list. These jars will
* never be included in generated job jar.
* <p>
- * This can be called for jars that are pre-installed on the Hadoop
+ * This can be called for jars that are pre-installed on the Hadoop
* cluster to reduce the size of the job jar.
*/
public void markJarAsPredeployed(String path) {
@@ -644,24 +644,24 @@ public class PigContext implements Seria
c = new HashMap<String,Class<?>>();
classCache.set(c);
}
-
+
return c;
}
-
+
@SuppressWarnings("rawtypes")
public static Class resolveClassName(String name) throws IOException{
- Map<String,Class<?>> cache = getClassCache();
-
+ Map<String,Class<?>> cache = getClassCache();
+
Class c = cache.get(name);
if (c != null) {
return c;
}
-
+
for(String prefix: getPackageImportList()) {
try {
c = Class.forName(prefix+name,true, PigContext.classloader);
cache.put(name, c);
-
+
return c;
}
catch (ClassNotFoundException e) {
@@ -868,11 +868,7 @@ public class PigContext implements Seria
* @return error source
*/
public byte getErrorSource() {
- if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) {
- return PigException.REMOTE_ENVIRONMENT;
- } else {
- return PigException.BUG;
- }
+ return PigException.REMOTE_ENVIRONMENT;
}
public static ArrayList<String> getPackageImportList() {
Modified: pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Fri May 30 19:07:23 2014
@@ -40,278 +40,268 @@ import org.apache.pig.impl.util.Pair;
* sampling process. It figures out how many reducers required to process a
* skewed key without causing spill and allocate this number of reducers to this
* key. This UDF outputs a map which contains 2 keys:
- *
+ *
* <li>"totalreducers": the value is an integer wich indicates the
* number of total reducers for this join job </li>
* <li>"partition.list": the value is a bag which contains a
* list of tuples with each tuple representing partitions for a skewed key.
- * The tuple has format of <join key>,<min index of reducer>,
+ * The tuple has format of <join key>,<min index of reducer>,
* <max index of reducer> </li>
- *
- * For example, a join job configures 10 reducers, and the sampling process
+ *
+ * For example, a join job configures 10 reducers, and the sampling process
* finds out 2 skewed keys, "swpv" needs 4 reducers and "swps"
* needs 2 reducers. The output file would be like following:
- *
+ *
* {totalreducers=10, partition.list={(swpv,0,3), (swps,4,5)}}
*
- * The name of this file is set into next MR job which does the actual join.
+ * The name of this file is set into next MR job which does the actual join.
* That job uses this information to partition skewed keys properly
- *
+ *
*/
public class PartitionSkewedKeys extends EvalFunc<Map<String, Object>> {
- public static final String PARTITION_LIST = "partition.list";
+ public static final String PARTITION_LIST = "partition.list";
- public static final String TOTAL_REDUCERS = "totalreducers";
+ public static final String TOTAL_REDUCERS = "totalreducers";
- public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
+ public static final float DEFAULT_PERCENT_MEMUSAGE = 0.3f;
- private Log log = LogFactory.getLog(getClass());
+ private Log log = LogFactory.getLog(getClass());
- BagFactory mBagFactory = BagFactory.getInstance();
+ BagFactory mBagFactory = BagFactory.getInstance();
- TupleFactory mTupleFactory = TupleFactory.getInstance();
+ TupleFactory mTupleFactory = TupleFactory.getInstance();
- private int currentIndex_;
+ private int currentIndex_;
- private int totalReducers_;
+ private int totalReducers_;
- private long totalMemory_;
+ private long totalMemory_;
- private String inputFile_;
+ private long totalSampleCount_;
- private long totalSampleCount_;
+ private double heapPercentage_;
- private double heapPercentage_;
-
// specify how many tuple a reducer can hold for a key
// this is for testing purpose. If not specified, then
// it is calculated based on memory size and size of tuple
- private int tupleMCount_;
+ private int tupleMCount_;
- public PartitionSkewedKeys() {
- this(null);
- }
-
- public PartitionSkewedKeys(String[] args) {
- totalReducers_ = -1;
- currentIndex_ = 0;
-
- if (args != null && args.length > 0) {
- heapPercentage_ = Double.parseDouble(args[0]);
- tupleMCount_ = Integer.parseInt(args[1]);
- inputFile_ = args[2];
- } else {
- heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
- log.debug("input file: " + inputFile_);
- }
-
- log.info("input file: " + inputFile_);
-
- }
-
- /**
- * first field in the input tuple is the number of reducers
- *
- * second field is the *sorted* bag of samples
- * this should be called only once
- */
- public Map<String, Object> exec(Tuple in) throws IOException {
- if (in == null || in.size() == 0) {
- return null;
- }
- Map<String, Object> output = new HashMap<String, Object>();
-
- totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
- log.info("Maximum of available memory is " + totalMemory_);
-
- ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
-
- Tuple currentTuple = null;
- long count = 0;
-
- // total size in memory for tuples in sample
- long totalSampleMSize = 0;
-
- //total input rows for the join
- long totalInputRows = 0;
-
- try {
- totalReducers_ = (Integer) in.get(0);
- DataBag samples = (DataBag) in.get(1);
-
- totalSampleCount_ = samples.size();
-
- log.info("totalSample: " + totalSampleCount_);
- log.info("totalReducers: " + totalReducers_);
-
- int maxReducers = 0;
-
- // first iterate the samples to find total number of rows
- Iterator<Tuple> iter1 = samples.iterator();
- while (iter1.hasNext()) {
- Tuple t = iter1.next();
- totalInputRows += (Long)t.get(t.size() - 1);
- }
-
- // now iterate samples to do the reducer calculation
- Iterator<Tuple> iter2 = samples.iterator();
- while (iter2.hasNext()) {
- Tuple t = iter2.next();
- if (hasSameKey(currentTuple, t) || currentTuple == null) {
- count++;
- totalSampleMSize += getMemorySize(t);
- } else {
- Pair<Tuple, Integer> p = calculateReducers(currentTuple,
- count, totalSampleMSize, totalInputRows);
- Tuple rt = p.first;
- if (rt != null) {
- reducerList.add(rt);
- }
- if (maxReducers < p.second) {
- maxReducers = p.second;
- }
- count = 1;
- totalSampleMSize = getMemorySize(t);
- }
-
- currentTuple = t;
- }
-
- // add last key
- if (count > 0) {
- Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
- totalSampleMSize, totalInputRows);
- Tuple rt = p.first;
- if (rt != null) {
- reducerList.add(rt);
- }
- if (maxReducers < p.second) {
- maxReducers = p.second;
- }
- }
-
- if (maxReducers > totalReducers_) {
- if(pigLogger != null) {
- pigLogger.warn(this,"You need at least " + maxReducers
- + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
- } else {
- log.warn("You need at least " + maxReducers
- + " reducers to avoid spillage and run this job efficiently.");
- }
- }
-
- output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
- output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
-
- log.info(output.toString());
- if (log.isDebugEnabled()) {
- log.debug(output.toString());
- }
-
- return output;
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
- long count, long totalMSize, long totalTuples) {
- // get average memory size per tuple
- double avgM = totalMSize / (double) count;
-
- // get the number of tuples that can fit into memory
- long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
-
- // estimate the number of total tuples for this key
- long keyTupleCount = (long) ( ((double) count/ totalSampleCount_) *
- totalTuples);
-
-
- int redCount = (int) Math.round(Math.ceil((double) keyTupleCount
- / tupleMCount));
-
- if (log.isDebugEnabled())
- {
- log.debug("avgM: " + avgM);
- log.debug("tuple count: " + keyTupleCount);
- log.debug("count: " + count);
- log.debug("A reducer can take " + tupleMCount + " tuples and "
- + keyTupleCount + " tuples are find for " + currentTuple);
- log.debug("key " + currentTuple + " need " + redCount + " reducers");
- }
-
- // this is not a skewed key
- if (redCount <= 1) {
- return new Pair<Tuple, Integer>(null, 1);
- }
-
- Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
- int i = 0;
- try {
- // set keys
- for (; i < currentTuple.size() - 2; i++) {
- t.set(i, currentTuple.get(i));
- }
-
- int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
- // set the min index of reducer for this key
- t.set(i++, currentIndex_);
- currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
- if (currentIndex_ < 0) {
- currentIndex_ += totalReducers_;
- }
- // set the max index of reducer for this key
- t.set(i++, currentIndex_);
- } catch (ExecException e) {
- throw new RuntimeException("Failed to set value to tuple." + e);
- }
-
- currentIndex_ = (currentIndex_ + 1) % totalReducers_;
-
- Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
-
- return p;
- }
-
- // the last field of the tuple is a tuple for memory size and disk size
- private long getMemorySize(Tuple t) {
- int s = t.size();
- try {
- return (Long) t.get(s - 2);
- } catch (ExecException e) {
- throw new RuntimeException(
- "Unable to retrive the size field from tuple.", e);
- }
- }
-
-
- private boolean hasSameKey(Tuple t1, Tuple t2) {
- // Have to break the tuple down and compare it field to field.
- int sz1 = t1 == null ? 0 : t1.size();
- int sz2 = t2 == null ? 0 : t2.size();
- if (sz2 != sz1) {
- return false;
- }
-
- for (int i = 0; i < sz1 - 2; i++) {
- try {
- int c = DataType.compare(t1.get(i), t2.get(i));
- if (c != 0) {
- return false;
- }
- } catch (ExecException e) {
- throw new RuntimeException("Unable to compare tuples", e);
- }
- }
+ public PartitionSkewedKeys() {
+ this(null);
+ }
+
+ public PartitionSkewedKeys(String[] args) {
+ totalReducers_ = -1;
+ currentIndex_ = 0;
+
+ if (args != null && args.length > 0) {
+ heapPercentage_ = Double.parseDouble(args[0]);
+ tupleMCount_ = Integer.parseInt(args[1]);
+ } else {
+ heapPercentage_ = DEFAULT_PERCENT_MEMUSAGE;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("pig.skewedjoin.reduce.memusage=" + heapPercentage_);
+ }
+ }
+
+ /**
+ * first field in the input tuple is the number of reducers
+ *
+ * second field is the *sorted* bag of samples
+ * this should be called only once
+ */
+ public Map<String, Object> exec(Tuple in) throws IOException {
+ if (in == null || in.size() == 0) {
+ return null;
+ }
+ Map<String, Object> output = new HashMap<String, Object>();
+
+ totalMemory_ = (long) (Runtime.getRuntime().maxMemory() * heapPercentage_);
+ log.info("Maximum of available memory is " + totalMemory_);
+
+ ArrayList<Tuple> reducerList = new ArrayList<Tuple>();
+
+ Tuple currentTuple = null;
+ long count = 0;
+
+ // total size in memory for tuples in sample
+ long totalSampleMSize = 0;
+
+ //total input rows for the join
+ long totalInputRows = 0;
+
+ try {
+ totalReducers_ = (Integer) in.get(0);
+ DataBag samples = (DataBag) in.get(1);
+
+ totalSampleCount_ = samples.size();
+
+ log.info("totalSample: " + totalSampleCount_);
+ log.info("totalReducers: " + totalReducers_);
+
+ int maxReducers = 0;
+
+ // first iterate the samples to find total number of rows
+ Iterator<Tuple> iter1 = samples.iterator();
+ while (iter1.hasNext()) {
+ Tuple t = iter1.next();
+ totalInputRows += (Long)t.get(t.size() - 1);
+ }
+
+ // now iterate samples to do the reducer calculation
+ Iterator<Tuple> iter2 = samples.iterator();
+ while (iter2.hasNext()) {
+ Tuple t = iter2.next();
+ if (hasSameKey(currentTuple, t) || currentTuple == null) {
+ count++;
+ totalSampleMSize += getMemorySize(t);
+ } else {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple,
+ count, totalSampleMSize, totalInputRows);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ count = 1;
+ totalSampleMSize = getMemorySize(t);
+ }
+
+ currentTuple = t;
+ }
+
+ // add last key
+ if (count > 0) {
+ Pair<Tuple, Integer> p = calculateReducers(currentTuple, count,
+ totalSampleMSize, totalInputRows);
+ Tuple rt = p.first;
+ if (rt != null) {
+ reducerList.add(rt);
+ }
+ if (maxReducers < p.second) {
+ maxReducers = p.second;
+ }
+ }
+
+ if (maxReducers > totalReducers_) {
+ if(pigLogger != null) {
+ pigLogger.warn(this,"You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job efficiently.", PigWarning.REDUCER_COUNT_LOW);
+ } else {
+ log.warn("You need at least " + maxReducers
+ + " reducers to avoid spillage and run this job efficiently.");
+ }
+ }
+
+ output.put(PARTITION_LIST, mBagFactory.newDefaultBag(reducerList));
+ output.put(TOTAL_REDUCERS, Integer.valueOf(totalReducers_));
+
+ log.info(output.toString());
+ if (log.isDebugEnabled()) {
+ log.debug(output.toString());
+ }
+
+ return output;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Pair<Tuple, Integer> calculateReducers(Tuple currentTuple,
+ long count, long totalMSize, long totalTuples) {
+ // get average memory size per tuple
+ double avgM = totalMSize / (double) count;
+
+ // get the number of tuples that can fit into memory
+ long tupleMCount = (tupleMCount_ <= 0)?(long) (totalMemory_ / avgM): tupleMCount_;
+
+ // estimate the number of total tuples for this key
+ long keyTupleCount = (long) ( ((double) count/ totalSampleCount_) * totalTuples);
+
+ int redCount = (int) Math.round(Math.ceil((double) keyTupleCount / tupleMCount));
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("avgM: " + avgM);
+ log.debug("tuple count: " + keyTupleCount);
+ log.debug("count: " + count);
+ log.debug("A reducer can take " + tupleMCount + " tuples and "
+ + keyTupleCount + " tuples are find for " + currentTuple);
+ log.debug("key " + currentTuple + " need " + redCount + " reducers");
+ }
+
+ // this is not a skewed key
+ if (redCount <= 1) {
+ return new Pair<Tuple, Integer>(null, 1);
+ }
+
+ Tuple t = this.mTupleFactory.newTuple(currentTuple.size());
+ int i = 0;
+ try {
+ // set keys
+ for (; i < currentTuple.size() - 2; i++) {
+ t.set(i, currentTuple.get(i));
+ }
+
+ int effectiveRedCount = redCount > totalReducers_? totalReducers_:redCount;
+ // set the min index of reducer for this key
+ t.set(i++, currentIndex_);
+ currentIndex_ = (currentIndex_ + effectiveRedCount) % totalReducers_ - 1;
+ if (currentIndex_ < 0) {
+ currentIndex_ += totalReducers_;
+ }
+ // set the max index of reducer for this key
+ t.set(i++, currentIndex_);
+ } catch (ExecException e) {
+ throw new RuntimeException("Failed to set value to tuple." + e);
+ }
+
+ currentIndex_ = (currentIndex_ + 1) % totalReducers_;
+
+ Pair<Tuple, Integer> p = new Pair<Tuple, Integer>(t, redCount);
+
+ return p;
+ }
+
+ // the last field of the tuple is a tuple for memory size and disk size
+ private long getMemorySize(Tuple t) {
+ int s = t.size();
+ try {
+ return (Long) t.get(s - 2);
+ } catch (ExecException e) {
+ throw new RuntimeException(
+ "Unable to retrive the size field from tuple.", e);
+ }
+ }
+
+
+ private boolean hasSameKey(Tuple t1, Tuple t2) {
+ // Have to break the tuple down and compare it field to field.
+ int sz1 = t1 == null ? 0 : t1.size();
+ int sz2 = t2 == null ? 0 : t2.size();
+ if (sz2 != sz1) {
+ return false;
+ }
+
+ for (int i = 0; i < sz1 - 2; i++) {
+ try {
+ int c = DataType.compare(t1.get(i), t2.get(i));
+ if (c != 0) {
+ return false;
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("Unable to compare tuples", e);
+ }
+ }
- return true;
- }
+ return true;
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Fri May 30 19:07:23 2014
@@ -18,71 +18,57 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.util.Pair;
/**
* See "Skewed Join sampler" in http://wiki.apache.org/pig/PigSampler
*/
public class PoissonSampleLoader extends SampleLoader {
-
+
// marker string to mark the last sample row, which has total number or rows
// seen by this map instance
// this string will be in the 2nd last column of the last sample row
// it is used by GetMemNumRows
- public static final String NUMROWS_TUPLE_MARKER =
+ public static final String NUMROWS_TUPLE_MARKER =
"\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
-
+
//num of rows sampled so far
private int numRowsSampled = 0;
-
+
//average size of tuple in memory, for tuples sampled
private long avgTupleMemSz = 0;
-
- //current row number
+
+ //current row number
private long rowNum = 0;
-
+
// number of tuples to skip after each sample
- long skipInterval = -1;
+ private long skipInterval = -1;
- // bytes in input to skip after every sample.
- // divide this by avgTupleMemSize to get skipInterval
+ // bytes in input to skip after every sample.
+ // divide this by avgTupleMemSize to get skipInterval
private long memToSkipPerSample = 0;
-
+
// has the special row with row number information been returned
private boolean numRowSplTupleReturned = false;
-
- /// For a given mean and a confidence, a sample rate is obtained from a poisson cdf
- private static final String SAMPLE_RATE = "pig.sksampler.samplerate";
-
+
// 17 is not a magic number. It can be obtained by using a poisson cumulative distribution function with the mean
// set to 10 (emperically, minimum number of samples) and the confidence set to 95%
private static final int DEFAULT_SAMPLE_RATE = 17;
-
+
private int sampleRate = DEFAULT_SAMPLE_RATE;
-
- /// % of memory available for the input data. This is currenty equal to the memory available
- /// for the skewed join
- private static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE;
-
+
// new Sample tuple
private Tuple newSample = null;
-
-// private final Log log = LogFactory.getLog(getClass());
-
+
public PoissonSampleLoader(String funcSpec, String ns) {
super(funcSpec);
super.setNumSamples(Integer.valueOf(ns)); // will be overridden
@@ -91,18 +77,18 @@ public class PoissonSampleLoader extends
@Override
public Tuple getNext() throws IOException {
if(numRowSplTupleReturned){
- // row num special row has been returned after all inputs
- // were read, nothing more to read
+ // row num special row has been returned after all inputs
+ // were read, nothing more to read
return null;
}
-
if(skipInterval == -1){
//select first tuple as sample and calculate
- // number of tuples to be skipped
+ // number of tuples to be skipped
Tuple t = loader.getNext();
- if(t == null)
+ if(t == null) {
return createNumRowTuple(null);
+ }
long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
memToSkipPerSample = availRedMem/sampleRate;
updateSkipInterval(t);
@@ -121,8 +107,9 @@ public class PoissonSampleLoader extends
// skipped enough, get new sample
Tuple t = loader.getNext();
- if(t == null)
+ if(t == null) {
return createNumRowTuple(newSample);
+ }
updateSkipInterval(t);
rowNum++;
Tuple currentSample = newSample;
@@ -136,14 +123,14 @@ public class PoissonSampleLoader extends
* @param t - tuple
*/
private void updateSkipInterval(Tuple t) {
- avgTupleMemSz =
+ avgTupleMemSz =
((avgTupleMemSz*numRowsSampled) + t.getMemorySize())/(numRowsSampled + 1);
skipInterval = memToSkipPerSample/avgTupleMemSz;
-
- // skipping fewer number of rows the first few times, to reduce
- // the probability of first tuples size (if much smaller than rest)
- // resulting in
- // very few samples being sampled. Sampling a little extra is OK
+
+ // skipping fewer number of rows the first few times, to reduce the
+ // probability of first tuples size (if much smaller than rest)
+ // resulting in very few samples being sampled. Sampling a little extra
+ // is OK
if(numRowsSampled < 5)
skipInterval = skipInterval/(10-numRowsSampled);
++numRowsSampled;
@@ -157,21 +144,22 @@ public class PoissonSampleLoader extends
*/
private Tuple createNumRowTuple(Tuple sample) throws ExecException {
int sz = (sample == null) ? 0 : sample.size();
- TupleFactory factory = TupleFactory.getInstance();
+ TupleFactory factory = TupleFactory.getInstance();
Tuple t = factory.newTuple(sz + 2);
-
+
if (sample != null) {
for(int i=0; i<sample.size(); i++){
t.set(i, sample.get(i));
}
}
-
+
t.set(sz, NUMROWS_TUPLE_MARKER);
t.set(sz + 1, rowNum);
numRowSplTupleReturned = true;
return t;
}
+ @SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
super.prepareToRead(reader, split);
@@ -184,8 +172,9 @@ public class PoissonSampleLoader extends
newSample = null;
Configuration conf = split.getConf();
- sampleRate = conf.getInt(SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
- heapPerc = conf.getFloat(PERC_MEM_AVAIL, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
+ sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+ heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL,
+ PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
}
}
Modified: pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Fri May 30 19:07:23 2014
@@ -55,7 +55,15 @@ public class NullablePartitionWritable e
return partitionIndex;
}
- @Override
+ @Override
+ public NullablePartitionWritable clone() throws CloneNotSupportedException {
+ NullablePartitionWritable clone = new NullablePartitionWritable();
+ clone.setKey(this.getKey());
+ clone.partitionIndex = this.partitionIndex;
+ return clone;
+ }
+
+ @Override
public int compareTo(Object o) {
return key.compareTo(((NullablePartitionWritable)o).getKey());
}
Modified: pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java Fri May 30 19:07:23 2014
@@ -42,6 +42,12 @@ public class NullableTuple extends PigNu
mValue = t;
}
+ public NullableTuple(NullableTuple copy) {
+ setNull(copy.isNull());
+ mValue = copy.mValue;
+ setIndex(copy.getIndex());
+ }
+
@Override
public Object getValueAsPigType() {
return isNull() ? null : (Tuple)mValue;
Modified: pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Fri May 30 19:07:23 2014
@@ -40,7 +40,7 @@ import org.apache.pig.data.Tuple;
//Put in to make the compiler not complain about WritableComparable
//being a generic type.
@SuppressWarnings("unchecked")
-public abstract class PigNullableWritable implements WritableComparable {
+public abstract class PigNullableWritable implements WritableComparable, Cloneable {
/**
* indices in multiquery optimized maps
@@ -61,6 +61,19 @@ public abstract class PigNullableWritabl
private byte mIndex;
+ @Override
+ public PigNullableWritable clone() throws CloneNotSupportedException {
+ try {
+ PigNullableWritable clone = this.getClass().newInstance();
+ clone.mNull = this.mNull;
+ clone.mValue = this.mValue;
+ clone.mIndex = this.mIndex;
+ return clone;
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while cloning " + this, e);
+ }
+ }
+
/**
* Compare two nullable objects. Step one is to check if either or both
* are null. If one is null and the other is not, then the one that is
Modified: pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/NodeIdGenerator.java Fri May 30 19:07:23 2014
@@ -18,37 +18,45 @@
package org.apache.pig.impl.plan;
-import java.util.Map;
import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
public class NodeIdGenerator {
private Map<String, Long> scopeToIdMap;
private static NodeIdGenerator theGenerator = new NodeIdGenerator();
-
+
private NodeIdGenerator() {
scopeToIdMap = new HashMap<String, Long>();
}
-
+
public static NodeIdGenerator getGenerator() {
return theGenerator;
}
-
+
public long getNextNodeId(String scope) {
Long val = scopeToIdMap.get(scope);
-
+
long nextId = 0;
-
+
if (val != null) {
nextId = val.longValue();
}
scopeToIdMap.put(scope, nextId + 1);
-
+
return nextId;
}
+ @VisibleForTesting
public static void reset(String scope) {
theGenerator.scopeToIdMap.put(scope, 0L) ;
}
+
+ @VisibleForTesting
+ public static void reset() {
+ theGenerator.scopeToIdMap.clear();
+ }
}
Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Fri May 30 19:07:23 2014
@@ -282,6 +282,32 @@ public abstract class OperatorPlan<E ext
}
}
}
+
+ /**
+ * Move everything below a given operator to the new operator plan. The specified operator will
+ * be moved and will be the root of the new operator plan
+ * @param root Operator to move everything after
+ * @param newPlan new operator plan to move things into
+ * @throws PlanException
+ */
+ public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
+ newPlan.add(root);
+ if (getSuccessors(root) == null) {
+ remove(root);
+ return;
+ }
+
+ List<E> succs = new ArrayList<E>();
+ succs.addAll(getSuccessors(root));
+
+ for (E succ : succs) {
+ moveTree(succ, newPlan);
+ }
+ remove(root);
+ for (E succ : succs) {
+ newPlan.connect(root, succ);
+ }
+ }
/**
* Trim everything above a given operator. The specified operator will
Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Fri May 30 19:07:23 2014
@@ -128,6 +128,43 @@ public class JarManager {
return pkgClass;
}
}
+
+ public static void createBootStrapJar(OutputStream os, PigContext pigContext) throws IOException {
+ JarOutputStream jarFile = new JarOutputStream(os);
+ HashMap<String, String> contents = new HashMap<String, String>();
+ Vector<JarListEntry> jarList = new Vector<JarListEntry>();
+
+ for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+ addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
+ }
+
+ Iterator<JarListEntry> it = jarList.iterator();
+ while (it.hasNext()) {
+ JarListEntry jarEntry = it.next();
+ mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
+ }
+
+ // Just like in MR Pig, we'll add the script files to Job.jar. For Jython, MR Pig packages
+ // all the dependencies in Job.jar. For JRuby, we need the resource pigudf.rb, which is in
+ // the pig jar. JavaScript files could be packaged either in the Job.jar or as Tez local
+ // resources; MR Pig adds them to the Job.jar so that's what we will do also. Groovy files
+ // must be added as Tez local resources in the TezPlanContainer (in MR Pig Groovy UDF's
+ // are actually broken since they cannot be found by the GroovyScriptEngine).
+ for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
+ InputStream stream = null;
+ if (entry.getValue().exists()) {
+ stream = new FileInputStream(entry.getValue());
+ } else {
+ stream = PigContext.getClassLoader().getResourceAsStream(entry.getValue().getPath());
+ }
+ if (stream == null) {
+ throw new IOException("Cannot find " + entry.getValue().getPath());
+ }
+ addStream(jarFile, entry.getKey(), stream, contents);
+ }
+
+ jarFile.close();
+ }
/**
* Create a jarfile in a temporary path, that is a merge of all the jarfiles containing the
Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Fri May 30 19:07:23 2014
@@ -26,20 +26,20 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
public class UDFContext {
-
+
private Configuration jconf = null;
private HashMap<UDFContextKey, Properties> udfConfs;
private Properties clientSysProps;
private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
- private static final String UDF_CONTEXT = "pig.udf.context";
-
- private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {
- @Override
- public UDFContext initialValue() {
- return new UDFContext();
- }
- };
-
+ private static final String UDF_CONTEXT = "pig.udf.context";
+
+ private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {
+ @Override
+ public UDFContext initialValue() {
+ return new UDFContext();
+ }
+ };
+
private UDFContext() {
udfConfs = new HashMap<UDFContextKey, Properties>();
}
@@ -61,10 +61,22 @@ public class UDFContext {
/*
* internal pig use only - should NOT be called from user code
*/
+ public static void destroy() {
+ tss = new ThreadLocal<UDFContext>() {
+ @Override
+ public UDFContext initialValue() {
+ return new UDFContext();
+ }
+ };
+ }
+
+ /*
+ * internal pig use only - should NOT be called from user code
+ */
public void setClientSystemProps(Properties properties) {
clientSysProps = properties;
}
-
+
/**
* Get the System Properties (Read only) as on the client machine from where Pig
* was launched. This will include command line properties passed at launch
@@ -75,8 +87,8 @@ public class UDFContext {
return clientSysProps;
}
/**
- * Adds the JobConf to this singleton. Will be
- * called on the backend by the Map and Reduce
+ * Adds the JobConf to this singleton. Will be
+ * called on the backend by the Map and Reduce
* functions so that UDFs can obtain the JobConf
* on the backend.
*/
@@ -99,7 +111,7 @@ public class UDFContext {
/**
* Get a properties object that is specific to this UDF.
- * Note that if a given UDF is called multiple times in a script,
+ * Note that if a given UDF is called multiple times in a script,
* and each instance passes different arguments, then each will
* be provided with different configuration object.
* This can be used by loaders to pass their input object path
@@ -117,11 +129,11 @@ public class UDFContext {
* the UDF unique.
* @return A reference to the properties object specific to
* the calling UDF. This is a reference, not a copy.
- * Any changes to this object will automatically be
- * propogated to other instances of the UDF calling this
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
* function.
*/
-
+
@SuppressWarnings("rawtypes")
public Properties getUDFProperties(Class c, String[] args) {
UDFContextKey k = generateKey(c, args);
@@ -135,7 +147,7 @@ public class UDFContext {
/**
* Get a properties object that is specific to this UDF.
- * Note that if a given UDF is called multiple times in a script,
+ * Note that if a given UDF is called multiple times in a script,
* they will all be provided the same configuration object. It
* is up to the UDF to make sure the multiple instances do not
* stomp on each other.
@@ -151,8 +163,8 @@ public class UDFContext {
* @param c of the UDF obtaining the properties object.
* @return A reference to the properties object specific to
* the calling UDF. This is a reference, not a copy.
- * Any changes to this object will automatically be
- * propogated to other instances of the UDF calling this
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
* function.
*/
@SuppressWarnings("rawtypes")
@@ -165,7 +177,7 @@ public class UDFContext {
}
return p;
}
-
+
/**
@@ -180,7 +192,7 @@ public class UDFContext {
conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
}
-
+
/**
* Populate the udfConfs field. This function is intended to
* be called by Map.configure or Reduce.configure on the backend.
@@ -188,20 +200,20 @@ public class UDFContext {
* @throws IOException if underlying deseralization throws it
*/
@SuppressWarnings("unchecked")
- public void deserialize() throws IOException {
+ public void deserialize() throws IOException {
udfConfs = (HashMap<UDFContextKey, Properties>)ObjectSerializer.deserialize(jconf.get(UDF_CONTEXT));
clientSysProps = (Properties)ObjectSerializer.deserialize(
jconf.get(CLIENT_SYS_PROPS));
}
-
+
private UDFContextKey generateKey(Class<?> c, String[] args) {
return new UDFContextKey(c.getName(), args);
}
-
+
public void reset() {
udfConfs.clear();
}
-
+
public boolean isUDFConfEmpty() {
return udfConfs.isEmpty();
}
@@ -217,10 +229,11 @@ public class UDFContext {
|| (jconf.get("mapred.task.id") == null &&
jconf.get("mapreduce.job.application.attempt.id") == null));
}
-
+
/**
* Make a shallow copy of the context.
*/
+ @Override
public UDFContext clone() {
UDFContext other = new UDFContext();
other.clientSysProps = this.clientSysProps;
@@ -228,10 +241,10 @@ public class UDFContext {
other.udfConfs = this.udfConfs;
return other;
}
-
+
/**
- * Class that acts as key for hashmap in UDFContext,
- * it holds the class and args of the udf, and
+ * Class that acts as key for hashmap in UDFContext,
+ * it holds the class and args of the udf, and
* implements equals() and hashCode()
*/
private static class UDFContextKey implements Serializable{
@@ -239,13 +252,13 @@ public class UDFContext {
private static final long serialVersionUID = 1;
private String className;
private String[] args;
-
+
UDFContextKey(){
}
UDFContextKey(String className, String [] args){
this.className = className;
- this.args = args;
+ this.args = args;
}
/* (non-Javadoc)
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1598702&r1=1598701&r2=1598702&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Fri May 30 19:07:23 2014
@@ -23,8 +23,10 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.io.SequenceInputStream;
+import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Enumeration;
@@ -39,8 +41,11 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobConf;
@@ -57,6 +62,7 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
@@ -305,7 +311,7 @@ public class Utils {
return false;
}
}
-
+
public String supportedCodecsToString() {
StringBuffer sb = new StringBuffer();
boolean first = true;
@@ -313,7 +319,7 @@ public class Utils {
if(first) {
first = false;
} else {
- sb.append(",");
+ sb.append(",");
}
sb.append(codec.name());
}
@@ -520,6 +526,54 @@ public class Utils {
}
}
+ /**
+ * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache
+ * @param pigContext the pigContext
+ * @param conf the job conf
+ * @param url the url to be added to distributed cache
+ * @return the path as seen on distributed cache
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ public static void putJarOnClassPathThroughDistributedCache(PigContext pigContext,
+ Configuration conf, URL url) throws IOException {
+ // Turn on the symlink feature
+ DistributedCache.createSymlink(conf);
+
+ // REGISTER always copies locally the jar file. see PigServer.registerJar()
+ Path pathInHDFS = Utils.shipToHDFS(pigContext, conf, url);
+ // and add to the DistributedCache
+ DistributedCache.addFileToClassPath(pathInHDFS, conf);
+ pigContext.skipJars.add(url.getPath());
+ }
+
+ /**
+ * copy the file to hdfs in a temporary path
+ * @param pigContext the pig context
+ * @param conf the job conf
+ * @param url the url to ship to hdfs
+ * @return the location where it was shipped
+ * @throws IOException
+ */
+ public static Path shipToHDFS(PigContext pigContext, Configuration conf, URL url)
+ throws IOException {
+ String path = url.getPath();
+ int slash = path.lastIndexOf("/");
+ String suffix = slash == -1 ? path : path.substring(slash+1);
+
+ Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);
+ FileSystem fs = dst.getFileSystem(conf);
+ OutputStream os = fs.create(dst);
+ try {
+ IOUtils.copyBytes(url.openStream(), os, 4096, true);
+ } finally {
+ // IOUtils can not close both the input and the output properly in a finally
+ // as we can get an exception in between opening the stream and calling the method
+ os.close();
+ }
+ return dst;
+ }
+
public static String getStackStraceStr(Throwable e) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);