You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/04/01 02:12:16 UTC
svn commit: r929755 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/plan/
ql/src/test/queries/clientpositive/ ql/src/test/results/clie...
Author: namit
Date: Thu Apr 1 00:12:16 2010
New Revision: 929755
URL: http://svn.apache.org/viewvc?rev=929755&view=rev
Log:
HIVE-1276. Remove extra reducesink if it is already followed by a reduce sink
(He Yongqiang via namit)
Added:
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q
hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Apr 1 00:12:16 2010
@@ -160,6 +160,9 @@ Trunk - Unreleased
HIVE-1278. Partition name to values conversion conversion method
(Paul Yang via namit)
+ HIVE-1276. Remove extra reducesink if it is already followed by a reduce sink
+ (He Yongqiang via namit)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Apr 1 00:12:16 2010
@@ -219,6 +219,8 @@ public class HiveConf extends Configurat
HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
HIVEENFORCESORTING("hive.enforce.sorting", false),
HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+
+ HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
// Optimizer
HIVEOPTCP("hive.optimize.cp", true), // column pruner
@@ -226,6 +228,7 @@ public class HiveConf extends Configurat
HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
+ HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
;
public final String varname;
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Apr 1 00:12:16 2010
@@ -515,4 +515,10 @@
<description>Maximum number of worker threads in the Thrift server's pool.</description>
</property>
+<property>
+ <name>hive.optimize.reducededuplication</name>
+ <value>true</value>
+ <description>Remove extra map-reduce jobs if the data is already clustered by the same key which needs to be used again. This should always be set to true. Since it is a new feature, it has been made configurable.</description>
+</property>
+
</configuration>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Thu Apr 1 00:12:16 2010
@@ -64,6 +64,9 @@ public class Optimizer {
}
transformations.add(new UnionProcessor());
transformations.add(new JoinReorder());
+ if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
+ transformations.add(new ReduceSinkDeDuplication());
+ }
}
/**
Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=929755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Thu Apr 1 00:12:16 2010
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * If two reducer sink operators share the same partition/sort columns, we
+ * should merge them. This should happen after map join optimization because map
+ * join optimization will remove reduce sink operators.
+ */
+public class ReduceSinkDeDuplication implements Transform{
+
+ protected ParseContext pGraphContext;
+
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ pGraphContext = pctx;
+
+ // generate pruned column list for all relevant operators
+ ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1", "RS%.*RS%"), ReduceSinkDeduplicateProcFactory
+ .getReducerReducerProc());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(ReduceSinkDeduplicateProcFactory
+ .getDefaultProc(), opRules, cppCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pGraphContext.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ return pGraphContext;
+ }
+
+ class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
+ ParseContext pctx;
+ List<ReduceSinkOperator> rejectedRSList;
+
+ public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
+ rejectedRSList = new ArrayList<ReduceSinkOperator>();
+ this.pctx = pctx;
+ }
+
+ public boolean contains (ReduceSinkOperator rsOp) {
+ return rejectedRSList.contains(rsOp);
+ }
+
+ public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
+ if (!rejectedRSList.contains(rsOp)) {
+ rejectedRSList.add(rsOp);
+ }
+ }
+
+ public ParseContext getPctx() {
+ return pctx;
+ }
+
+ public void setPctx(ParseContext pctx) {
+ this.pctx = pctx;
+ }
+ }
+
+
+ static class ReduceSinkDeduplicateProcFactory {
+
+
+ public static NodeProcessor getReducerReducerProc() {
+ return new ReducerReducerProc();
+ }
+
+ public static NodeProcessor getDefaultProc() {
+ return new DefaultProc();
+ }
+
+ /*
+ * do nothing.
+ */
+ static class DefaultProc implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ return null;
+ }
+ }
+
+ static class ReducerReducerProc implements NodeProcessor {
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
+ ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
+
+ if(ctx.contains(childReduceSink)) {
+ return null;
+ }
+
+ ParseContext pGraphContext = ctx.getPctx();
+ HashMap<String, String> childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink);
+ ReduceSinkOperator parentRS = null;
+ parentRS = findSingleParentReduceSink(childReduceSink, pGraphContext);
+ if (parentRS == null) {
+ ctx.addRejectedReduceSinkOperator(childReduceSink);
+ return null;
+ }
+ HashMap<String, String> parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS);
+ Operator<? extends Serializable> stopBacktrackFlagOp = null;
+ if (parentRS.getParentOperators() == null
+ || parentRS.getParentOperators().size() == 0) {
+ stopBacktrackFlagOp = parentRS;
+ } else if (parentRS.getParentOperators().size() != 1) {
+ return null;
+ } else {
+ stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
+ }
+
+ boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
+ if (!succeed) {
+ return null;
+ }
+ succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext);
+ if (!succeed) {
+ return null;
+ }
+
+ boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
+ if (!same) {
+ return null;
+ }
+ replaceReduceSinkWithSelectOperator(childReduceSink, pGraphContext);
+ return null;
+ }
+
+ private void replaceReduceSinkWithSelectOperator(
+ ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
+ List<Operator<? extends Serializable>> parentOp = childReduceSink.getParentOperators();
+ List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
+
+ Operator<? extends Serializable> oldParent = childReduceSink;
+
+ if (childOp != null && childOp.size() == 1
+ && ((childOp.get(0)) instanceof ExtractOperator)) {
+ oldParent = childOp.get(0);
+ childOp = childOp.get(0).getChildOperators();
+ }
+
+ Operator<? extends Serializable> input = parentOp.get(0);
+ input.getChildOperators().clear();
+
+ RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRR();
+
+ ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
+ ArrayList<String> outputs = new ArrayList<String>();
+ List<String> outputCols = childReduceSink.getConf().getOutputValueColumnNames();
+ RowResolver outputRS = new RowResolver();
+
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+ for (int i = 0; i < outputCols.size(); i++) {
+ String internalName = outputCols.get(i);
+ String[] nm = inputRR.reverseLookup(internalName);
+ ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
+ ExprNodeDesc colDesc = childReduceSink.getConf().getValueCols().get(i);
+ exprs.add(colDesc);
+ outputs.add(internalName);
+ outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo
+ .getType(), nm[0], valueInfo.getIsPartitionCol()));
+ colExprMap.put(internalName, colDesc);
+ }
+
+ SelectDesc select = new SelectDesc(exprs, outputs, false);
+
+ SelectOperator sel = (SelectOperator) putOpInsertMap(
+ OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+ .getColumnInfos()), input), inputRR, pGraphContext);
+
+ sel.setColumnExprMap(colExprMap);
+
+ // Insert the select operator in between.
+ sel.setChildOperators(childOp);
+ for (Operator<? extends Serializable> ch : childOp) {
+ ch.replaceParent(oldParent, sel);
+ }
+
+ }
+
+ private Operator<? extends Serializable> putOpInsertMap(
+ Operator<? extends Serializable> op, RowResolver rr, ParseContext pGraphContext) {
+ OpParseContext ctx = new OpParseContext(rr);
+ pGraphContext.getOpParseCtx().put(op, ctx);
+ return op;
+ }
+
+ private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
+ ReduceSinkOperator parentRS,
+ HashMap<String, String> childColumnMapping,
+ HashMap<String, String> parentColumnMapping) {
+
+ ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
+ ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
+
+ boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
+ childPartitionCols, parentPartitionCols);
+ if (!ret) {
+ return false;
+ }
+
+ ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
+ ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
+ ret = compareExprNodes(childColumnMapping, parentColumnMapping,
+ childReduceKeyCols, parentReduceKeyCols);
+ if (!ret) {
+ return false;
+ }
+
+ String childRSOrder = childReduceSink.getConf().getOrder();
+ String parentRSOrder = parentRS.getConf().getOrder();
+ boolean moveChildRSOrderToParent = false;
+ //move child reduce sink's order to the parent reduce sink operator.
+ if (childRSOrder != null && !(childRSOrder.trim().equals(""))) {
+ if (parentRSOrder == null
+ || !childRSOrder.trim().equals(parentRSOrder.trim())) {
+ return false;
+ }
+ } else {
+ if(parentRSOrder == null || parentRSOrder.trim().equals("")) {
+ moveChildRSOrderToParent = true;
+ }
+ }
+
+ int childNumReducers = childReduceSink.getConf().getNumReducers();
+ int parentNumReducers = parentRS.getConf().getNumReducers();
+ boolean moveChildReducerNumToParent = false;
+ //move child reduce sink's number reducers to the parent reduce sink operator.
+ if (childNumReducers != parentNumReducers) {
+ if (childNumReducers == -1) {
+ //do nothing.
+ } else if (parentNumReducers == -1) {
+ //set childNumReducers in the parent reduce sink operator.
+ moveChildReducerNumToParent = true;
+ } else {
+ return false;
+ }
+ }
+
+ if(moveChildRSOrderToParent) {
+ parentRS.getConf().setOrder(childRSOrder);
+ }
+
+ if(moveChildReducerNumToParent) {
+ parentRS.getConf().setNumReducers(childNumReducers);
+ }
+
+ return true;
+ }
+
+ private boolean compareExprNodes(HashMap<String, String> childColumnMapping,
+ HashMap<String, String> parentColumnMapping,
+ ArrayList<ExprNodeDesc> childColExprs,
+ ArrayList<ExprNodeDesc> parentColExprs) {
+
+ boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
+ boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
+
+ if (childEmpty) { //both empty
+ return true;
+ }
+
+ //child not empty here
+ if (parentEmpty) { // child not empty, but parent empty
+ return false;
+ }
+
+ if (childColExprs.size() != parentColExprs.size()) {
+ return false;
+ }
+ int i = 0;
+ while (i < childColExprs.size()) {
+ ExprNodeDesc childExpr = childColExprs.get(i);
+ ExprNodeDesc parentExpr = parentColExprs.get(i);
+
+ if ((childExpr instanceof ExprNodeColumnDesc)
+ && (parentExpr instanceof ExprNodeColumnDesc)) {
+ String childCol = childColumnMapping
+ .get(((ExprNodeColumnDesc) childExpr).getColumn());
+ String parentCol = parentColumnMapping
+ .get(((ExprNodeColumnDesc) childExpr).getColumn());
+
+ if (!childCol.equals(parentCol)) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ i++;
+ }
+ return true;
+ }
+
+ /*
+ * back track column names to find their corresponding original column
+ * names. Only allow simple operators like 'select column' or filter.
+ */
+ private boolean backTrackColumnNames(
+ HashMap<String, String> columnMapping,
+ ReduceSinkOperator reduceSink,
+ Operator<? extends Serializable> stopBacktrackFlagOp, ParseContext pGraphContext) {
+ Operator<? extends Serializable> startOperator = reduceSink;
+ while (startOperator != null && startOperator != stopBacktrackFlagOp) {
+ startOperator = startOperator.getParentOperators().get(0);
+ Map<String, ExprNodeDesc> colExprMap = startOperator.getColumnExprMap();
+ if(colExprMap == null || colExprMap.size()==0) {
+ continue;
+ }
+ Iterator<String> keyIter = columnMapping.keySet().iterator();
+ while (keyIter.hasNext()) {
+ String key = keyIter.next();
+ String oldCol = columnMapping.get(key);
+ ExprNodeDesc exprNode = colExprMap.get(oldCol);
+ if(exprNode instanceof ExprNodeColumnDesc) {
+ String col = ((ExprNodeColumnDesc)exprNode).getColumn();
+ columnMapping.put(key, col);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
+ HashMap<String, String> columnMapping = new HashMap<String, String> ();
+ ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
+ ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
+ ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
+ if(partitionCols != null) {
+ for (ExprNodeDesc desc : partitionCols) {
+ List<String> cols = desc.getCols();
+ for(String col : cols) {
+ columnMapping.put(col, col);
+ }
+ }
+ }
+ if(reduceKeyCols != null) {
+ for (ExprNodeDesc desc : reduceKeyCols) {
+ List<String> cols = desc.getCols();
+ for(String col : cols) {
+ columnMapping.put(col, col);
+ }
+ }
+ }
+ return columnMapping;
+ }
+
+ private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink, ParseContext pGraphContext) {
+ Operator<? extends Serializable> start = childReduceSink;
+ while(start != null) {
+ if (start.getParentOperators() == null
+ || start.getParentOperators().size() != 1) {
+ // this potentially is a join operator
+ return null;
+ }
+
+ boolean allowed = false;
+ if ((start instanceof SelectOperator)
+ || (start instanceof FilterOperator)
+ || (start instanceof ExtractOperator)
+ || (start instanceof ForwardOperator)
+ || (start instanceof ScriptOperator)
+ || (start instanceof ReduceSinkOperator)) {
+ allowed = true;
+ }
+
+ if (!allowed) {
+ return null;
+ }
+
+ if ((start instanceof ScriptOperator)
+ && !HiveConf.getBoolVar(pGraphContext.getConf(),
+ HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
+ return null;
+ }
+
+ start = start.getParentOperators().get(0);
+ if(start instanceof ReduceSinkOperator) {
+ return (ReduceSinkOperator)start;
+ }
+ }
+ return null;
+ }
+ }
+
+ }
+}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Thu Apr 1 00:12:16 2010
@@ -177,5 +177,11 @@ public class ReduceSinkDesc implements S
return keySerializeInfo.getProperties().getProperty(
org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER);
}
+
+ public void setOrder(String orderStr) {
+ keySerializeInfo.getProperties().setProperty(
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER,
+ orderStr);
+ }
}
Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q?rev=929755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q Thu Apr 1 00:12:16 2010
@@ -0,0 +1,43 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 1;
+set hive.exec.script.trust = true;
+
+drop table bucket5_1;
+
+CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS;
+explain extended
+insert overwrite table bucket5_1
+select * from src cluster by key;
+
+insert overwrite table bucket5_1
+select * from src cluster by key;
+
+select sum(hash(key)),sum(hash(value)) from bucket5_1;
+select sum(hash(key)),sum(hash(value)) from src;
+
+drop table complex_tbl_1;
+create table complex_tbl_1(aid string, bid string, t int, ctime string, etime bigint, l string, et string) partitioned by (ds string);
+
+drop table complex_tbl_2;
+create table complex_tbl_2(aet string, aes string) partitioned by (ds string);
+
+explain extended
+insert overwrite table complex_tbl_1 partition (ds='2010-03-29')
+select s2.* from
+(
+ select TRANSFORM (aid,bid,t,ctime,etime,l,et)
+ USING 'cat'
+ AS (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from
+ (
+ select transform(aet,aes)
+ using 'cat'
+ as (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from complex_tbl_2 where ds ='2010-03-29' cluster by bid
+)s
+)s2;
+
+drop table complex_tbl_2;
+drop table complex_tbl_1;
+drop table bucket5_1;
+
Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out?rev=929755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out Thu Apr 1 00:12:16 2010
@@ -0,0 +1,412 @@
+PREHOOK: query: drop table bucket5_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket5_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@bucket5_1
+PREHOOK: query: explain extended
+insert overwrite table bucket5_1
+select * from src cluster by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table bucket5_1
+select * from src cluster by key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB bucket5_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_CLUSTERBY (TOK_TABLE_OR_COL key))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: key
+ type: string
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: string
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ Needs Tagging: false
+ Path -> Alias:
+ file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src [src]
+ Path -> Partition:
+ file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src
+ Partition
+ base file name: src
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count -1
+ columns key,value
+ columns.types string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src
+ name src
+ serialization.ddl struct src { string key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1270070048
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count -1
+ columns key,value
+ columns.types string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src
+ name src
+ serialization.ddl struct src { string key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1270070048
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: src
+ name: src
+ Reduce Operator Tree:
+ Extract
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-08_990_7618431442918354593/10000
+ NumFilesPerFileSink: 2
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count 2
+ bucket_field_name key
+ columns key,value
+ columns.types string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/bucket5_1
+ name bucket5_1
+ serialization.ddl struct bucket5_1 { string key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1270070048
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: bucket5_1
+ TotalFiles: 2
+ MultiFileSpray: true
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: true
+ source: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-08_990_7618431442918354593/10000
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count 2
+ bucket_field_name key
+ columns key,value
+ columns.types string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/bucket5_1
+ name bucket5_1
+ serialization.ddl struct bucket5_1 { string key, string value}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1270070048
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: bucket5_1
+ tmp directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-08_990_7618431442918354593/10001
+
+
+PREHOOK: query: insert overwrite table bucket5_1
+select * from src cluster by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@bucket5_1
+POSTHOOK: query: insert overwrite table bucket5_1
+select * from src cluster by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@bucket5_1
+PREHOOK: query: select sum(hash(key)),sum(hash(value)) from bucket5_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket5_1
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-29_030_3957486725242181321/10000
+POSTHOOK: query: select sum(hash(key)),sum(hash(value)) from bucket5_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket5_1
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-29_030_3957486725242181321/10000
+21025334 36210398070
+PREHOOK: query: select sum(hash(key)),sum(hash(value)) from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-34_046_8143626013163823870/10000
+POSTHOOK: query: select sum(hash(key)),sum(hash(value)) from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-34_046_8143626013163823870/10000
+21025334 36210398070
+PREHOOK: query: drop table complex_tbl_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table complex_tbl_1(aid string, bid string, t int, ctime string, etime bigint, l string, et string) partitioned by (ds string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table complex_tbl_1(aid string, bid string, t int, ctime string, etime bigint, l string, et string) partitioned by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@complex_tbl_1
+PREHOOK: query: drop table complex_tbl_2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table complex_tbl_2(aet string, aes string) partitioned by (ds string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table complex_tbl_2(aet string, aes string) partitioned by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@complex_tbl_2
+PREHOOK: query: explain extended
+insert overwrite table complex_tbl_1 partition (ds='2010-03-29')
+select s2.* from
+(
+ select TRANSFORM (aid,bid,t,ctime,etime,l,et)
+ USING 'cat'
+ AS (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from
+ (
+ select transform(aet,aes)
+ using 'cat'
+ as (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from complex_tbl_2 where ds ='2010-03-29' cluster by bid
+)s
+)s2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table complex_tbl_1 partition (ds='2010-03-29')
+select s2.* from
+(
+ select TRANSFORM (aid,bid,t,ctime,etime,l,et)
+ USING 'cat'
+ AS (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from
+ (
+ select transform(aet,aes)
+ using 'cat'
+ as (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from complex_tbl_2 where ds ='2010-03-29' cluster by bid
+)s
+)s2
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF complex_tbl_2)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL aet) (TOK_TABLE_OR_COL aes)) TOK_SERDE TOK_RECORDWRITER 'cat' TOK_SERDE TOK_RECORDREADER (TOK_TABCOLLIST (TOK_TABCOL aid TOK_STRING) (TOK_TABCOL bid TOK_STRING) (TOK_TABCOL t TOK_INT) (TOK_TABCOL ctime TOK_STRING) (TOK_TABCOL etime TOK_BIGINT) (TOK_TABCOL l TOK_STRING) (TOK_TABCOL et TOK_STRING))))) (TOK_WHERE (= (TOK_TABLE_OR_COL ds) '2010-03-29')) (TOK_CLUSTERBY (TOK_TABLE_OR_COL bid)))) s)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL aid) (TOK_TABLE_OR_COL bid) (TOK_TABLE_OR_COL t) (TOK_TABLE_OR_COL ctime) (TOK_TABLE_OR_COL etime) (TOK_TABLE_OR_COL l) (TOK_TABLE_OR_COL et)) TOK_SERDE TOK_RECORDWRITER 'cat' TOK_SERDE TOK_RECORDREADER (TOK_TABCOLLIST (TOK
_TABCOL aid TOK_STRING) (TOK_TABCOL bid TOK_STRING) (TOK_TABCOL t TOK_INT) (TOK_TABCOL ctime TOK_STRING) (TOK_TABCOL etime TOK_BIGINT) (TOK_TABCOL l TOK_STRING) (TOK_TABCOL et TOK_STRING))))))) s2)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB complex_tbl_1 (TOK_PARTSPEC (TOK_PARTVAL ds '2010-03-29')))) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF s2)))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Alias -> Map Operator Tree:
+ s2:s:complex_tbl_2
+ TableScan
+ alias: complex_tbl_2
+ Filter Operator
+ isSamplingPred: false
+ predicate:
+ expr: (ds = '2010-03-29')
+ type: boolean
+ Filter Operator
+ isSamplingPred: false
+ predicate:
+ expr: (ds = '2010-03-29')
+ type: boolean
+ Select Operator
+ expressions:
+ expr: aet
+ type: string
+ expr: aes
+ type: string
+ outputColumnNames: _col0, _col1
+ Transform Operator
+ command: cat
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ columns _col0,_col1,_col2,_col3,_col4,_col5,_col6
+ columns.types string,string,int,string,bigint,string,string
+ field.delim 9
+ serialization.format 9
+ Reduce Output Operator
+ key expressions:
+ expr: _col1
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col1
+ type: string
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ expr: _col2
+ type: int
+ expr: _col3
+ type: string
+ expr: _col4
+ type: bigint
+ expr: _col5
+ type: string
+ expr: _col6
+ type: string
+ Needs Tagging: false
+ Reduce Operator Tree:
+ Extract
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ expr: _col2
+ type: int
+ expr: _col3
+ type: string
+ expr: _col4
+ type: bigint
+ expr: _col5
+ type: string
+ expr: _col6
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+ Transform Operator
+ command: cat
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ columns _col0,_col1,_col2,_col3,_col4,_col5,_col6
+ columns.types string,string,int,string,bigint,string,string
+ field.delim 9
+ serialization.format 9
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: string
+ expr: _col2
+ type: int
+ expr: _col3
+ type: string
+ expr: _col4
+ type: bigint
+ expr: _col5
+ type: string
+ expr: _col6
+ type: string
+ outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-39_140_5528759727183093646/10000
+ NumFilesPerFileSink: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count -1
+ columns aid,bid,t,ctime,etime,l,et
+ columns.types string:string:int:string:bigint:string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/complex_tbl_1
+ name complex_tbl_1
+ partition_columns ds
+ serialization.ddl struct complex_tbl_1 { string aid, string bid, i32 t, string ctime, i64 etime, string l, string et}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1270070079
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: complex_tbl_1
+ TotalFiles: 1
+ MultiFileSpray: false
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ ds 2010-03-29
+ replace: true
+ source: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-39_140_5528759727183093646/10000
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ properties:
+ bucket_count -1
+ columns aid,bid,t,ctime,etime,l,et
+ columns.types string:string:int:string:bigint:string:string
+ file.inputformat org.apache.hadoop.mapred.TextInputFormat
+ file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/complex_tbl_1
+ name complex_tbl_1
+ partition_columns ds
+ serialization.ddl struct complex_tbl_1 { string aid, string bid, i32 t, string ctime, i64 etime, string l, string et}
+ serialization.format 1
+ serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ transient_lastDdlTime 1270070079
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: complex_tbl_1
+ tmp directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-39_140_5528759727183093646/10001
+
+
+PREHOOK: query: drop table complex_tbl_2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@complex_tbl_2
+PREHOOK: query: drop table complex_tbl_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@complex_tbl_1
+PREHOOK: query: drop table bucket5_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket5_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@bucket5_1