You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/04/01 02:12:16 UTC

svn commit: r929755 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/test/queries/clientpositive/ ql/src/test/results/clie...

Author: namit
Date: Thu Apr  1 00:12:16 2010
New Revision: 929755

URL: http://svn.apache.org/viewvc?rev=929755&view=rev
Log:
HIVE-1276. Remove extra reducesink if it is already followed by a reduce sink
(He Yongqiang via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Apr  1 00:12:16 2010
@@ -160,6 +160,9 @@ Trunk -  Unreleased
     HIVE-1278. Partition name to values conversion conversion method
     (Paul Yang via namit)
 
+    HIVE-1276. Remove extra reducesink if it is already followed by a reduce sink
+    (He Yongqiang via namit)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Apr  1 00:12:16 2010
@@ -219,6 +219,8 @@ public class HiveConf extends Configurat
     HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
     HIVEENFORCESORTING("hive.enforce.sorting", false),
     HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
+    
+    HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
 
     // Optimizer
     HIVEOPTCP("hive.optimize.cp", true), // column pruner
@@ -226,6 +228,7 @@ public class HiveConf extends Configurat
     HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
     HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
+    HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), 
     ;
 
     public final String varname;

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Apr  1 00:12:16 2010
@@ -515,4 +515,10 @@
   <description>Maximum number of worker threads in the Thrift server's pool.</description>
 </property>
 
+<property>
+  <name>hive.optimize.reducededuplication</name>
+  <value>true</value>
+  <description>Remove extra map-reduce jobs if the data is already clustered by the same key which needs to be used again. This should always be set to true. Since it is a new feature, it has been made configurable.</description>
+</property>
+
 </configuration>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Thu Apr  1 00:12:16 2010
@@ -64,6 +64,9 @@ public class Optimizer {
     }
     transformations.add(new UnionProcessor());
     transformations.add(new JoinReorder());
+    if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
+      transformations.add(new ReduceSinkDeDuplication());
+    }
   }
 
   /**

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java?rev=929755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkDeDuplication.java Thu Apr  1 00:12:16 2010
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+
+/**
+ * If two reducer sink operators share the same partition/sort columns, we
+ * should merge them. This should happen after map join optimization because map
+ * join optimization will remove reduce sink operators.
+ */
+public class ReduceSinkDeDuplication implements Transform{
+
+  protected ParseContext pGraphContext;
+  
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    pGraphContext = pctx;
+    
+ // generate pruned column list for all relevant operators
+    ReduceSinkDeduplicateProcCtx cppCtx = new ReduceSinkDeduplicateProcCtx(pGraphContext);
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1", "RS%.*RS%"), ReduceSinkDeduplicateProcFactory
+        .getReducerReducerProc());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(ReduceSinkDeduplicateProcFactory
+        .getDefaultProc(), opRules, cppCtx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pGraphContext.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pGraphContext;
+  }
+  
+  class ReduceSinkDeduplicateProcCtx implements NodeProcessorCtx{
+    ParseContext pctx;
+    List<ReduceSinkOperator> rejectedRSList;
+
+    public ReduceSinkDeduplicateProcCtx(ParseContext pctx) {
+      rejectedRSList = new ArrayList<ReduceSinkOperator>();
+      this.pctx = pctx;
+    }
+    
+    public boolean contains (ReduceSinkOperator rsOp) {
+      return rejectedRSList.contains(rsOp);
+    }
+    
+    public void addRejectedReduceSinkOperator(ReduceSinkOperator rsOp) {
+      if (!rejectedRSList.contains(rsOp)) {
+        rejectedRSList.add(rsOp);
+      }
+    }
+
+    public ParseContext getPctx() {
+      return pctx;
+    }
+
+    public void setPctx(ParseContext pctx) {
+      this.pctx = pctx;
+    }
+  }
+  
+  
+  static class ReduceSinkDeduplicateProcFactory {
+    
+
+    public static NodeProcessor getReducerReducerProc() {
+      return new ReducerReducerProc();
+    }
+
+    public static NodeProcessor getDefaultProc() {
+      return new DefaultProc();
+    }
+
+    /*
+     * do nothing.
+     */
+    static class DefaultProc implements NodeProcessor {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
+        return null;
+      }
+    }
+    
+    static class ReducerReducerProc implements NodeProcessor {
+      @Override
+      public Object process(Node nd, Stack<Node> stack,
+          NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
+        ReduceSinkDeduplicateProcCtx ctx = (ReduceSinkDeduplicateProcCtx) procCtx;
+        ReduceSinkOperator childReduceSink = (ReduceSinkOperator)nd;
+        
+        if(ctx.contains(childReduceSink)) {
+          return null;
+        }
+        
+        ParseContext pGraphContext = ctx.getPctx();
+        HashMap<String, String> childColumnMapping = getPartitionAndKeyColumnMapping(childReduceSink);
+        ReduceSinkOperator parentRS = null;
+        parentRS = findSingleParentReduceSink(childReduceSink, pGraphContext);
+        if (parentRS == null) {
+          ctx.addRejectedReduceSinkOperator(childReduceSink);
+          return null;
+        }
+        HashMap<String, String> parentColumnMapping = getPartitionAndKeyColumnMapping(parentRS);
+        Operator<? extends Serializable> stopBacktrackFlagOp = null;
+        if (parentRS.getParentOperators() == null
+            || parentRS.getParentOperators().size() == 0) {
+          stopBacktrackFlagOp =  parentRS;
+        } else if (parentRS.getParentOperators().size() != 1) {
+          return null;
+        } else {
+          stopBacktrackFlagOp = parentRS.getParentOperators().get(0);
+        }
+        
+        boolean succeed = backTrackColumnNames(childColumnMapping, childReduceSink, stopBacktrackFlagOp, pGraphContext);
+        if (!succeed) {
+          return null;
+        }
+        succeed = backTrackColumnNames(parentColumnMapping, parentRS, stopBacktrackFlagOp, pGraphContext);
+        if (!succeed) {
+          return null;
+        }
+        
+        boolean same = compareReduceSink(childReduceSink, parentRS, childColumnMapping, parentColumnMapping);
+        if (!same) {
+          return null;
+        }
+        replaceReduceSinkWithSelectOperator(childReduceSink, pGraphContext);
+        return null;
+      }
+
+      private void replaceReduceSinkWithSelectOperator(
+          ReduceSinkOperator childReduceSink, ParseContext pGraphContext) throws SemanticException {
+        List<Operator<? extends Serializable>> parentOp = childReduceSink.getParentOperators();
+        List<Operator<? extends Serializable>> childOp = childReduceSink.getChildOperators();
+        
+        Operator<? extends Serializable> oldParent = childReduceSink;
+        
+        if (childOp != null && childOp.size() == 1
+            && ((childOp.get(0)) instanceof ExtractOperator)) {
+          oldParent = childOp.get(0);
+          childOp = childOp.get(0).getChildOperators();
+        }
+        
+        Operator<? extends Serializable> input = parentOp.get(0);
+        input.getChildOperators().clear();
+        
+        RowResolver inputRR = pGraphContext.getOpParseCtx().get(input).getRR();
+
+        ArrayList<ExprNodeDesc> exprs = new ArrayList<ExprNodeDesc>();
+        ArrayList<String> outputs = new ArrayList<String>();
+        List<String> outputCols = childReduceSink.getConf().getOutputValueColumnNames();
+        RowResolver outputRS = new RowResolver();
+
+        Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+        for (int i = 0; i < outputCols.size(); i++) {
+          String internalName = outputCols.get(i);
+          String[] nm = inputRR.reverseLookup(internalName);
+          ColumnInfo valueInfo = inputRR.get(nm[0], nm[1]);
+          ExprNodeDesc colDesc = childReduceSink.getConf().getValueCols().get(i);
+          exprs.add(colDesc);
+          outputs.add(internalName);
+          outputRS.put(nm[0], nm[1], new ColumnInfo(internalName, valueInfo
+              .getType(), nm[0], valueInfo.getIsPartitionCol()));
+          colExprMap.put(internalName, colDesc);
+        }
+
+        SelectDesc select = new SelectDesc(exprs, outputs, false);
+
+        SelectOperator sel = (SelectOperator) putOpInsertMap(
+            OperatorFactory.getAndMakeChild(select, new RowSchema(inputRR
+            .getColumnInfos()), input), inputRR, pGraphContext);
+
+        sel.setColumnExprMap(colExprMap);
+
+        // Insert the select operator in between.
+        sel.setChildOperators(childOp);
+        for (Operator<? extends Serializable> ch : childOp) {
+          ch.replaceParent(oldParent, sel);
+        }
+        
+      }
+      
+      private Operator<? extends Serializable> putOpInsertMap(
+          Operator<? extends Serializable> op, RowResolver rr, ParseContext pGraphContext) {
+        OpParseContext ctx = new OpParseContext(rr);
+        pGraphContext.getOpParseCtx().put(op, ctx);
+        return op;
+      }
+
+      private boolean compareReduceSink(ReduceSinkOperator childReduceSink,
+          ReduceSinkOperator parentRS,
+          HashMap<String, String> childColumnMapping,
+          HashMap<String, String> parentColumnMapping) {
+        
+        ArrayList<ExprNodeDesc> childPartitionCols = childReduceSink.getConf().getPartitionCols();
+        ArrayList<ExprNodeDesc> parentPartitionCols = parentRS.getConf().getPartitionCols();
+        
+        boolean ret = compareExprNodes(childColumnMapping, parentColumnMapping,
+            childPartitionCols, parentPartitionCols);
+        if (!ret) {
+          return false;
+        }
+        
+        ArrayList<ExprNodeDesc> childReduceKeyCols = childReduceSink.getConf().getKeyCols();
+        ArrayList<ExprNodeDesc> parentReduceKeyCols = parentRS.getConf().getKeyCols();
+        ret = compareExprNodes(childColumnMapping, parentColumnMapping,
+            childReduceKeyCols, parentReduceKeyCols);
+        if (!ret) {
+          return false;
+        }
+        
+        String childRSOrder = childReduceSink.getConf().getOrder();
+        String parentRSOrder = parentRS.getConf().getOrder();
+        boolean moveChildRSOrderToParent = false;
+        //move child reduce sink's order to the parent reduce sink operator.
+        if (childRSOrder != null && !(childRSOrder.trim().equals(""))) {
+          if (parentRSOrder == null
+              || !childRSOrder.trim().equals(parentRSOrder.trim())) {
+            return false;
+          }
+        } else {
+          if(parentRSOrder == null || parentRSOrder.trim().equals("")) {
+            moveChildRSOrderToParent = true;
+          }
+        }
+        
+        int childNumReducers = childReduceSink.getConf().getNumReducers();
+        int parentNumReducers = parentRS.getConf().getNumReducers();
+        boolean moveChildReducerNumToParent = false;
+        //move child reduce sink's number reducers to the parent reduce sink operator.
+        if (childNumReducers != parentNumReducers) {
+          if (childNumReducers == -1) {
+            //do nothing. 
+          } else if (parentNumReducers == -1) {
+            //set childNumReducers in the parent reduce sink operator.
+            moveChildReducerNumToParent = true;
+          } else {
+            return false;
+          }
+        }
+        
+        if(moveChildRSOrderToParent) {
+          parentRS.getConf().setOrder(childRSOrder);          
+        }
+        
+        if(moveChildReducerNumToParent) {
+          parentRS.getConf().setNumReducers(childNumReducers);
+        }
+        
+        return true;
+      }
+
+      private boolean compareExprNodes(HashMap<String, String> childColumnMapping,
+          HashMap<String, String> parentColumnMapping,
+          ArrayList<ExprNodeDesc> childColExprs,
+          ArrayList<ExprNodeDesc> parentColExprs) {
+        
+        boolean childEmpty = childColExprs == null || childColExprs.size() == 0;
+        boolean parentEmpty = parentColExprs == null || parentColExprs.size() == 0;
+        
+        if (childEmpty) { //both empty
+          return true;
+        }
+        
+        //child not empty here
+        if (parentEmpty) { // child not empty, but parent empty
+          return false;
+        }
+
+        if (childColExprs.size() != parentColExprs.size()) {
+          return false;
+        }
+        int i = 0;
+        while (i < childColExprs.size()) {
+          ExprNodeDesc childExpr = childColExprs.get(i);
+          ExprNodeDesc parentExpr = parentColExprs.get(i);
+
+          if ((childExpr instanceof ExprNodeColumnDesc)
+              && (parentExpr instanceof ExprNodeColumnDesc)) {
+            String childCol = childColumnMapping
+                .get(((ExprNodeColumnDesc) childExpr).getColumn());
+            String parentCol = parentColumnMapping
+                .get(((ExprNodeColumnDesc) childExpr).getColumn());
+
+            if (!childCol.equals(parentCol)) {
+              return false;
+            }
+          } else {
+            return false;
+          }
+          i++;
+        }
+        return true;
+      }
+
+      /*
+       * back track column names to find their corresponding original column
+       * names. Only allow simple operators like 'select column' or filter.
+       */
+      private boolean backTrackColumnNames(
+          HashMap<String, String> columnMapping,
+          ReduceSinkOperator reduceSink,
+          Operator<? extends Serializable> stopBacktrackFlagOp, ParseContext pGraphContext) {
+        Operator<? extends Serializable> startOperator = reduceSink;
+        while (startOperator != null && startOperator != stopBacktrackFlagOp) {
+          startOperator = startOperator.getParentOperators().get(0);
+          Map<String, ExprNodeDesc> colExprMap = startOperator.getColumnExprMap();
+          if(colExprMap == null || colExprMap.size()==0) {
+            continue;
+          }
+          Iterator<String> keyIter = columnMapping.keySet().iterator();
+          while (keyIter.hasNext()) {
+            String key = keyIter.next();
+            String oldCol = columnMapping.get(key);
+            ExprNodeDesc exprNode = colExprMap.get(oldCol);
+            if(exprNode instanceof ExprNodeColumnDesc) {
+              String col = ((ExprNodeColumnDesc)exprNode).getColumn();
+              columnMapping.put(key, col);
+            } else {
+              return false;
+            }
+          }
+        }
+        
+        return true;
+      }
+
+      private HashMap<String, String> getPartitionAndKeyColumnMapping(ReduceSinkOperator reduceSink) {
+        HashMap<String, String> columnMapping = new HashMap<String, String> ();
+        ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();        
+        ArrayList<ExprNodeDesc> partitionCols = reduceSinkDesc.getPartitionCols();
+        ArrayList<ExprNodeDesc> reduceKeyCols = reduceSinkDesc.getKeyCols();
+        if(partitionCols != null) {
+          for (ExprNodeDesc desc : partitionCols) {
+            List<String> cols = desc.getCols();
+            for(String col : cols) {
+              columnMapping.put(col, col);
+            }
+          }
+        }
+        if(reduceKeyCols != null) {
+          for (ExprNodeDesc desc : reduceKeyCols) {
+            List<String> cols = desc.getCols();
+            for(String col : cols) {
+              columnMapping.put(col, col);
+            }
+          }
+        }
+        return columnMapping;
+      }
+
+      private ReduceSinkOperator findSingleParentReduceSink(ReduceSinkOperator childReduceSink, ParseContext pGraphContext) {
+        Operator<? extends Serializable> start = childReduceSink;
+        while(start != null) {
+          if (start.getParentOperators() == null
+              || start.getParentOperators().size() != 1) {
+            // this potentially is a join operator
+            return null;
+          }
+          
+          boolean allowed = false;
+          if ((start instanceof SelectOperator)
+              || (start instanceof FilterOperator)
+              || (start instanceof ExtractOperator)
+              || (start instanceof ForwardOperator)
+              || (start instanceof ScriptOperator)
+              || (start instanceof ReduceSinkOperator)) {
+            allowed = true;
+          }
+          
+          if (!allowed) {
+            return null;
+          }
+          
+          if ((start instanceof ScriptOperator)
+              && !HiveConf.getBoolVar(pGraphContext.getConf(),
+                  HiveConf.ConfVars.HIVESCRIPTOPERATORTRUST)) {
+            return null;
+          }
+          
+          start = start.getParentOperators().get(0);
+          if(start instanceof ReduceSinkOperator) {
+            return (ReduceSinkOperator)start;
+          }
+        }
+        return null;
+      }
+    }
+    
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=929755&r1=929754&r2=929755&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Thu Apr  1 00:12:16 2010
@@ -177,5 +177,11 @@ public class ReduceSinkDesc implements S
     return keySerializeInfo.getProperties().getProperty(
         org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER);
   }
+  
+  public void setOrder(String orderStr) {
+    keySerializeInfo.getProperties().setProperty(
+        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER,
+        orderStr);
+  }
 
 }

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q?rev=929755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/reduce_deduplicate.q Thu Apr  1 00:12:16 2010
@@ -0,0 +1,43 @@
+set hive.enforce.bucketing = true;
+set hive.exec.reducers.max = 1;
+set hive.exec.script.trust = true;
+
+drop table bucket5_1;
+
+CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS;
+explain extended
+insert overwrite table bucket5_1
+select * from src cluster by key;
+
+insert overwrite table bucket5_1
+select * from src cluster by key;
+
+select sum(hash(key)),sum(hash(value)) from bucket5_1;
+select sum(hash(key)),sum(hash(value)) from src;
+
+drop table complex_tbl_1;
+create table complex_tbl_1(aid string, bid string, t int, ctime string, etime bigint, l string, et string) partitioned by (ds string);
+
+drop table complex_tbl_2;
+create table complex_tbl_2(aet string, aes string) partitioned by (ds string);
+
+explain extended
+insert overwrite table complex_tbl_1 partition (ds='2010-03-29')
+select s2.* from
+(
+ select TRANSFORM (aid,bid,t,ctime,etime,l,et)
+ USING 'cat'
+ AS (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from
+  (
+   select transform(aet,aes)
+   using 'cat'
+   as (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+   from complex_tbl_2 where ds ='2010-03-29' cluster by bid
+)s
+)s2;
+
+drop table complex_tbl_2;
+drop table complex_tbl_1;
+drop table bucket5_1;
+

Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out?rev=929755&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/reduce_deduplicate.q.out Thu Apr  1 00:12:16 2010
@@ -0,0 +1,412 @@
+PREHOOK: query: drop table bucket5_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket5_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE bucket5_1(key string, value string) CLUSTERED BY (key) INTO 2 BUCKETS
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@bucket5_1
+PREHOOK: query: explain extended
+insert overwrite table bucket5_1
+select * from src cluster by key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table bucket5_1
+select * from src cluster by key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_TABREF src)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB bucket5_1)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_CLUSTERBY (TOK_TABLE_OR_COL key))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        src 
+          TableScan
+            alias: src
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+                    expr: value
+                    type: string
+              outputColumnNames: _col0, _col1
+              Reduce Output Operator
+                key expressions:
+                      expr: _col0
+                      type: string
+                sort order: +
+                Map-reduce partition columns:
+                      expr: _col0
+                      type: string
+                tag: -1
+                value expressions:
+                      expr: _col0
+                      type: string
+                      expr: _col1
+                      type: string
+      Needs Tagging: false
+      Path -> Alias:
+        file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src [src]
+      Path -> Partition:
+        file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src 
+          Partition
+            base file name: src
+            input format: org.apache.hadoop.mapred.TextInputFormat
+            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+            properties:
+              bucket_count -1
+              columns key,value
+              columns.types string:string
+              file.inputformat org.apache.hadoop.mapred.TextInputFormat
+              file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src
+              name src
+              serialization.ddl struct src { string key, string value}
+              serialization.format 1
+              serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              transient_lastDdlTime 1270070048
+            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+          
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                bucket_count -1
+                columns key,value
+                columns.types string:string
+                file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/src
+                name src
+                serialization.ddl struct src { string key, string value}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                transient_lastDdlTime 1270070048
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: src
+            name: src
+      Reduce Operator Tree:
+        Extract
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: string
+                  expr: _col1
+                  type: string
+            outputColumnNames: _col0, _col1
+            File Output Operator
+              compressed: false
+              GlobalTableId: 1
+              directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-08_990_7618431442918354593/10000
+              NumFilesPerFileSink: 2
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  properties:
+                    bucket_count 2
+                    bucket_field_name key
+                    columns key,value
+                    columns.types string:string
+                    file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                    file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/bucket5_1
+                    name bucket5_1
+                    serialization.ddl struct bucket5_1 { string key, string value}
+                    serialization.format 1
+                    serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    transient_lastDdlTime 1270070048
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                  name: bucket5_1
+              TotalFiles: 2
+              MultiFileSpray: true
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          source: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-08_990_7618431442918354593/10000
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                bucket_count 2
+                bucket_field_name key
+                columns key,value
+                columns.types string:string
+                file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/bucket5_1
+                name bucket5_1
+                serialization.ddl struct bucket5_1 { string key, string value}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                transient_lastDdlTime 1270070048
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: bucket5_1
+          tmp directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-08_990_7618431442918354593/10001
+
+
+PREHOOK: query: insert overwrite table bucket5_1
+select * from src cluster by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@bucket5_1
+POSTHOOK: query: insert overwrite table bucket5_1
+select * from src cluster by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@bucket5_1
+PREHOOK: query: select sum(hash(key)),sum(hash(value)) from bucket5_1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@bucket5_1
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-29_030_3957486725242181321/10000
+POSTHOOK: query: select sum(hash(key)),sum(hash(value)) from bucket5_1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@bucket5_1
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-29_030_3957486725242181321/10000
+21025334	36210398070
+PREHOOK: query: select sum(hash(key)),sum(hash(value)) from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-34_046_8143626013163823870/10000
+POSTHOOK: query: select sum(hash(key)),sum(hash(value)) from src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-34_046_8143626013163823870/10000
+21025334	36210398070
+PREHOOK: query: drop table complex_tbl_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_1
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table complex_tbl_1(aid string, bid string, t int, ctime string, etime bigint, l string, et string) partitioned by (ds string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table complex_tbl_1(aid string, bid string, t int, ctime string, etime bigint, l string, et string) partitioned by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@complex_tbl_1
+PREHOOK: query: drop table complex_tbl_2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_2
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table complex_tbl_2(aet string, aes string) partitioned by (ds string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table complex_tbl_2(aet string, aes string) partitioned by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@complex_tbl_2
+PREHOOK: query: explain extended
+insert overwrite table complex_tbl_1 partition (ds='2010-03-29')
+select s2.* from
+(
+ select TRANSFORM (aid,bid,t,ctime,etime,l,et)
+ USING 'cat'
+ AS (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from
+  (
+   select transform(aet,aes)
+   using 'cat'
+   as (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+   from complex_tbl_2 where ds ='2010-03-29' cluster by bid
+)s
+)s2
+PREHOOK: type: QUERY
+POSTHOOK: query: explain extended
+insert overwrite table complex_tbl_1 partition (ds='2010-03-29')
+select s2.* from
+(
+ select TRANSFORM (aid,bid,t,ctime,etime,l,et)
+ USING 'cat'
+ AS (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+ from
+  (
+   select transform(aet,aes)
+   using 'cat'
+   as (aid string, bid string, t int, ctime string, etime bigint, l string, et string)
+   from complex_tbl_2 where ds ='2010-03-29' cluster by bid
+)s
+)s2
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF complex_tbl_2)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL aet) (TOK_TABLE_OR_COL aes)) TOK_SERDE TOK_RECORDWRITER 'cat' TOK_SERDE TOK_RECORDREADER (TOK_TABCOLLIST (TOK_TABCOL aid TOK_STRING) (TOK_TABCOL bid TOK_STRING) (TOK_TABCOL t TOK_INT) (TOK_TABCOL ctime TOK_STRING) (TOK_TABCOL etime TOK_BIGINT) (TOK_TABCOL l TOK_STRING) (TOK_TABCOL et TOK_STRING))))) (TOK_WHERE (= (TOK_TABLE_OR_COL ds) '2010-03-29')) (TOK_CLUSTERBY (TOK_TABLE_OR_COL bid)))) s)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TRANSFORM (TOK_EXPLIST (TOK_TABLE_OR_COL aid) (TOK_TABLE_OR_COL bid) (TOK_TABLE_OR_COL t) (TOK_TABLE_OR_COL ctime) (TOK_TABLE_OR_COL etime) (TOK_TABLE_OR_COL l) (TOK_TABLE_OR_COL et)) TOK_SERDE TOK_RECORDWRITER 'cat' TOK_SERDE TOK_RECORDREADER (TOK_TABCOLLIST (TOK
 _TABCOL aid TOK_STRING) (TOK_TABCOL bid TOK_STRING) (TOK_TABCOL t TOK_INT) (TOK_TABCOL ctime TOK_STRING) (TOK_TABCOL etime TOK_BIGINT) (TOK_TABCOL l TOK_STRING) (TOK_TABCOL et TOK_STRING))))))) s2)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB complex_tbl_1 (TOK_PARTSPEC (TOK_PARTVAL ds '2010-03-29')))) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF s2)))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        s2:s:complex_tbl_2 
+          TableScan
+            alias: complex_tbl_2
+            Filter Operator
+              isSamplingPred: false
+              predicate:
+                  expr: (ds = '2010-03-29')
+                  type: boolean
+              Filter Operator
+                isSamplingPred: false
+                predicate:
+                    expr: (ds = '2010-03-29')
+                    type: boolean
+                Select Operator
+                  expressions:
+                        expr: aet
+                        type: string
+                        expr: aes
+                        type: string
+                  outputColumnNames: _col0, _col1
+                  Transform Operator
+                    command: cat
+                    output info:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        properties:
+                          columns _col0,_col1,_col2,_col3,_col4,_col5,_col6
+                          columns.types string,string,int,string,bigint,string,string
+                          field.delim 9
+                          serialization.format 9
+                    Reduce Output Operator
+                      key expressions:
+                            expr: _col1
+                            type: string
+                      sort order: +
+                      Map-reduce partition columns:
+                            expr: _col1
+                            type: string
+                      tag: -1
+                      value expressions:
+                            expr: _col0
+                            type: string
+                            expr: _col1
+                            type: string
+                            expr: _col2
+                            type: int
+                            expr: _col3
+                            type: string
+                            expr: _col4
+                            type: bigint
+                            expr: _col5
+                            type: string
+                            expr: _col6
+                            type: string
+      Needs Tagging: false
+      Reduce Operator Tree:
+        Extract
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: string
+                  expr: _col1
+                  type: string
+                  expr: _col2
+                  type: int
+                  expr: _col3
+                  type: string
+                  expr: _col4
+                  type: bigint
+                  expr: _col5
+                  type: string
+                  expr: _col6
+                  type: string
+            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+            Transform Operator
+              command: cat
+              output info:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  properties:
+                    columns _col0,_col1,_col2,_col3,_col4,_col5,_col6
+                    columns.types string,string,int,string,bigint,string,string
+                    field.delim 9
+                    serialization.format 9
+              Select Operator
+                expressions:
+                      expr: _col0
+                      type: string
+                      expr: _col1
+                      type: string
+                      expr: _col2
+                      type: int
+                      expr: _col3
+                      type: string
+                      expr: _col4
+                      type: bigint
+                      expr: _col5
+                      type: string
+                      expr: _col6
+                      type: string
+                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
+                File Output Operator
+                  compressed: false
+                  GlobalTableId: 1
+                  directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-39_140_5528759727183093646/10000
+                  NumFilesPerFileSink: 1
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      properties:
+                        bucket_count -1
+                        columns aid,bid,t,ctime,etime,l,et
+                        columns.types string:string:int:string:bigint:string:string
+                        file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                        file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/complex_tbl_1
+                        name complex_tbl_1
+                        partition_columns ds
+                        serialization.ddl struct complex_tbl_1 { string aid, string bid, i32 t, string ctime, i64 etime, string l, string et}
+                        serialization.format 1
+                        serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                        transient_lastDdlTime 1270070079
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      name: complex_tbl_1
+                  TotalFiles: 1
+                  MultiFileSpray: false
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            ds 2010-03-29
+          replace: true
+          source: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-39_140_5528759727183093646/10000
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              properties:
+                bucket_count -1
+                columns aid,bid,t,ctime,etime,l,et
+                columns.types string:string:int:string:bigint:string:string
+                file.inputformat org.apache.hadoop.mapred.TextInputFormat
+                file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                location file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/test/data/warehouse/complex_tbl_1
+                name complex_tbl_1
+                partition_columns ds
+                serialization.ddl struct complex_tbl_1 { string aid, string bid, i32 t, string ctime, i64 etime, string l, string et}
+                serialization.format 1
+                serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                transient_lastDdlTime 1270070079
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: complex_tbl_1
+          tmp directory: file:/Users/heyongqiang/Documents/workspace/Hive_RCFile/build/ql/scratchdir/hive_2010-03-31_14-14-39_140_5528759727183093646/10001
+
+
+PREHOOK: query: drop table complex_tbl_2
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_2
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@complex_tbl_2
+PREHOOK: query: drop table complex_tbl_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table complex_tbl_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@complex_tbl_1
+PREHOOK: query: drop table bucket5_1
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table bucket5_1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@bucket5_1