You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/28 16:58:18 UTC

svn commit: r1518234 [1/2] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/a...

Author: hashutosh
Date: Wed Aug 28 14:58:17 2013
New Revision: 1518234

URL: http://svn.apache.org/r1518234
Log:
HIVE-3562 : Receipt NoAMIP41258408

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
    hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
    hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out
    hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/build.xml
    hive/trunk/ql/ivy.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug 28 14:58:17 2013
@@ -525,6 +525,8 @@ public class HiveConf extends Configurat
     HIVELIMITOPTLIMITFILE("hive.limit.optimize.limit.file", 10),
     HIVELIMITOPTENABLE("hive.limit.optimize.enable", false),
     HIVELIMITOPTMAXFETCH("hive.limit.optimize.fetch.max", 50000),
+    HIVELIMITPUSHDOWNMEMORYUSAGE("hive.limit.pushdown.memory.usage", -1f),
+
     HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000),
     HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75),
     HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Aug 28 14:58:17 2013
@@ -431,19 +431,19 @@
 <property>
   <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name>
   <value>0.3</value>
-  <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description>
+  <description>Portion of total memory to be used by map-side group aggregation hash table, when this group by is followed by map join</description>
 </property>
 
 <property>
   <name>hive.map.aggr.hash.force.flush.memory.threshold</name>
   <value>0.9</value>
-  <description>The max memory to be used by map-side grup aggregation hash table, if the memory usage is higher than this number, force to flush data</description>
+  <description>The max memory to be used by map-side group aggregation hash table, if the memory usage is higher than this number, force to flush data</description>
 </property>
 
 <property>
   <name>hive.map.aggr.hash.percentmemory</name>
   <value>0.5</value>
-  <description>Portion of total memory to be used by map-side grup aggregation hash table</description>
+  <description>Portion of total memory to be used by map-side group aggregation hash table</description>
 </property>
 
 <property>
@@ -1584,6 +1584,12 @@
 </property>
 
 <property>
+  <name>hive.limit.pushdown.memory.usage</name>
+  <value>0.3f</value>
+  <description>The max memory to be used for hash in RS operator for top K selection.</description>
+</property>
+
+<property>
   <name>hive.rework.mapredwork</name>
   <value>false</value>
   <description>should rework the mapred work or not.

Modified: hive/trunk/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/build.xml?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/build.xml (original)
+++ hive/trunk/ql/build.xml Wed Aug 28 14:58:17 2013
@@ -250,15 +250,24 @@
         <exclude name="META-INF/MANIFEST.MF"/>
       </patternset>
     </unzip>
-    <unzip 
-      src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar" 
+    <unzip
+      src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar"
       dest="${build.dir.hive}/protobuf-java/classes">
       <patternset>
         <exclude name="META-INF"/>
         <exclude name="META-INF/MANIFEST.MF"/>
       </patternset>
     </unzip>
-    <unzip 
+    <unzip
+      src="${build.ivy.lib.dir}/default/guava-${guava.version}.jar"
+      dest="${build.dir.hive}/guava/classes">
+      <patternset>
+        <exclude name="META-INF"/>
+        <exclude name="META-INF/MANIFEST.MF"/>
+      </patternset>
+    </unzip>
+
+    <unzip
       src="${build.ivy.lib.dir}/default/snappy-${snappy.version}.jar" 
       dest="${build.dir.hive}/snappy/classes">
       <patternset>
@@ -296,14 +305,11 @@
       <fileset dir="${build.dir.hive}/shims/classes" includes="**/*.class"/>
       <fileset dir="${build.dir.hive}/javaewah/classes" includes="**/*.class"/>
       <fileset dir="${build.dir.hive}/javolution/classes" includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/protobuf-java/classes" 
-               includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/snappy/classes" 
-               includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/jackson-core-asl/classes"
-      	       includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes"
-                 includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/protobuf-java/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/snappy/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/jackson-core-asl/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/guava/classes" includes="**/*.class"/>
       <manifest>
         <!-- Not putting these in their own manifest section, since that inserts
              a new-line, which breaks the reading of the attributes. -->

Modified: hive/trunk/ql/ivy.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/ivy.xml?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/ivy.xml (original)
+++ hive/trunk/ql/ivy.xml Wed Aug 28 14:58:17 2013
@@ -47,7 +47,6 @@
     <dependency org="org.iq80.snappy" name="snappy" 
                 rev="${snappy.version}" transitive="false"/>
 
-    <!-- hadoop specific guava -->
     <dependency org="org.json" name="json" rev="${json.version}"/>
     <dependency org="commons-collections" name="commons-collections" rev="${commons-collections.version}"/>
     <dependency org="commons-configuration" name="commons-configuration" rev="${commons-configuration.version}"
@@ -80,6 +79,8 @@
       <exclude org="org.apache.commons" module="commons-daemon"/><!--bad POM-->
     </dependency>
 
+    <dependency org="com.google.guava" name="guava" rev="${guava.version}" transitive="false"/>
+
     <!-- Test Dependencies -->
     <dependency org="junit" name="junit" rev="${junit.version}" conf="test->default" />
     

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Wed Aug 28 14:58:17 2013
@@ -50,6 +50,11 @@ public class ExtractOperator extends Ope
     return OperatorType.EXTRACT;
   }
 
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Wed Aug 28 14:58:17 2013
@@ -41,6 +41,11 @@ public class ForwardOperator extends Ope
     return OperatorType.FORWARD;
   }
 
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Aug 28 14:58:17 2013
@@ -1181,4 +1181,15 @@ public class GroupByOperator extends Ope
   public OperatorType getType() {
     return OperatorType.GROUPBY;
   }
+
+  /**
+   * we can push the limit above GBY (running in Reducer), since that will generate single row
+   * for each group. This doesn't necessarily hold for GBY (running in Mappers),
+   * so we don't push limit above it.
+   */
+  @Override
+  public boolean acceptLimitPushdown() {
+    return getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL ||
+        getConf().getMode() == GroupByDesc.Mode.COMPLETE;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Aug 28 14:58:17 2013
@@ -592,6 +592,9 @@ public abstract class Operator<T extends
     state = State.CLOSE;
     LOG.info(id + " finished. closing... ");
 
+    // call the operator specific close routine
+    closeOp(abort);
+
     if (counterNameToEnum != null) {
       incrCounter(numInputRowsCntr, inputRows);
       incrCounter(numOutputRowsCntr, outputRows);
@@ -600,9 +603,6 @@ public abstract class Operator<T extends
 
     LOG.info(id + " forwarded " + cntr + " rows");
 
-    // call the operator specific close routine
-    closeOp(abort);
-
     try {
       logStats();
       if (childOperators == null) {
@@ -816,13 +816,7 @@ public abstract class Operator<T extends
       }
     }
 
-    if (isLogInfoEnabled) {
-      cntr++;
-      if (cntr == nextCntr) {
-        LOG.info(id + " forwarding " + cntr + " rows");
-        nextCntr = getNextCntr(cntr);
-      }
-    }
+    increaseForward(1);
 
     // For debugging purposes:
     // System.out.println("" + this.getClass() + ": " +
@@ -855,6 +849,18 @@ public abstract class Operator<T extends
     }
   }
 
+  void increaseForward(long counter) {
+    if (isLogInfoEnabled) {
+      cntr += counter;
+      if (cntr >= nextCntr) {
+        LOG.info(id + " forwarding " + cntr + " rows");
+        do {
+          nextCntr = getNextCntr(nextCntr);
+        } while(cntr >= nextCntr);
+      }
+    }
+  }
+
   public void resetStats() {
     for (Enum<?> e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
@@ -1496,6 +1502,17 @@ public abstract class Operator<T extends
     return true;
   }
 
+  /**
+   * used for LimitPushdownOptimizer
+   *
+   * if all of the operators between limit and reduce-sink does not remove any input rows
+   * in the range of limit count, limit can be pushed down to reduce-sink operator.
+   * forward, select, etc.
+   */
+  public boolean acceptLimitPushdown() {
+    return false;
+  }
+
   @Override
   public String toString() {
     return getName() + "[" + getIdentifier() + "]";

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Aug 28 14:58:17 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -44,13 +43,12 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
 public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
-    implements Serializable {
+    implements Serializable, TopNHash.BinaryCollector {
 
   private static final long serialVersionUID = 1L;
 
@@ -90,6 +88,9 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
+  // picks topN K:V pairs from input. can be null
+  private transient TopNHash reducerHash;
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
@@ -131,6 +132,8 @@ public class ReduceSinkOperator extends 
           .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
 
+      reducerHash = createTopKHash();
+
       firstRow = true;
       initializeChildren(hconf);
     } catch (Exception e) {
@@ -139,14 +142,44 @@ public class ReduceSinkOperator extends 
     }
   }
 
+  private TopNHash createTopKHash() {
+    int limit = conf.getTopN();
+    float percent = conf.getTopNMemoryUsage();
+    if (limit < 0 || percent <= 0) {
+      return null;
+    }
+    if (limit == 0) {
+      return TopNHash.create0();
+    }
+    // limit * 64 : compensation of arrays for key/value/hashcodes
+    long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
+    if (threshold < 0) {
+      return null;
+    }
+    return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
+  }
+
   transient InspectableObject tempInspectableObject = new InspectableObject();
   transient HiveKey keyWritable = new HiveKey();
-  transient Writable value;
 
   transient StructObjectInspector keyObjectInspector;
   transient StructObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
+  /**
+   * This two dimensional array holds key data and a corresponding Union object
+   * which contains the tag identifying the aggregate expression for distinct columns.
+   *
+   * If there is no distict expression, cachedKeys is simply like this.
+   * cachedKeys[0] = [col0][col1]
+   *
+   * with two distict expression, union(tag:key) is attatched for each distinct expression
+   * cachedKeys[0] = [col0][col1][0:dist1]
+   * cachedKeys[1] = [col0][col1][1:dist2]
+   *
+   * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+   * see {@link ExprNodeColumnEvaluator}
+   */
   transient Object[][] cachedKeys;
   transient Object[] cachedValues;
   transient List<List<Integer>> distinctColIndices;
@@ -198,6 +231,7 @@ public class ReduceSinkOperator extends 
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -241,8 +275,6 @@ public class ReduceSinkOperator extends 
       for (int i = 0; i < valueEval.length; i++) {
         cachedValues[i] = valueEval[i].evaluate(row);
       }
-      // Serialize the value
-      value = valueSerializer.serialize(cachedValues, valueObjectInspector);
 
       // Evaluate the keys
       Object[] distributionKeys = new Object[numDistributionKeys];
@@ -267,6 +299,8 @@ public class ReduceSinkOperator extends 
         // no distinct key
         System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
       }
+
+      BytesWritable value = null;
       // Serialize the keys and append the tag
       for (int i = 0; i < cachedKeys.length; i++) {
         if (keyIsText) {
@@ -294,26 +328,85 @@ public class ReduceSinkOperator extends 
           }
         }
         keyWritable.setHashCode(keyHashCode);
-        if (out != null) {
-          out.collect(keyWritable, value);
-          // Since this is a terminal operator, update counters explicitly -
-          // forward is not called
-          if (counterNameToEnum != null) {
-            ++outputRows;
-            if (outputRows % 1000 == 0) {
-              incrCounter(numOutputRowsCntr, outputRows);
-              outputRows = 0;
+
+        if (reducerHash == null) {
+          if (null != out) {
+            collect(keyWritable, value = getValue(row, value));
+          }
+       } else {
+          int index = reducerHash.indexOf(keyWritable);
+          if (index == TopNHash.EXCLUDED) {
+            continue;
+          }
+          value = getValue(row, value);
+          if (index >= 0) {
+            reducerHash.set(index, value);
+          } else {
+            if (index == TopNHash.FORWARD) {
+              collect(keyWritable, value);
+            } else if (index == TopNHash.FLUSH) {
+              LOG.info("Top-N hash is flushed");
+              reducerHash.flush();
+              // we can now retry adding key/value into hash, which is flushed.
+              // but for simplicity, just forward them
+              collect(keyWritable, value);
+            } else if (index == TopNHash.DISABLE) {
+              LOG.info("Top-N hash is disabled");
+              reducerHash.flush();
+              collect(keyWritable, value);
+              reducerHash = null;
             }
           }
         }
       }
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (IOException e) {
+    } catch (HiveException e) {
+      throw e;
+    } catch (Exception e) {
       throw new HiveException(e);
     }
   }
 
+  public void collect(BytesWritable key, BytesWritable value) throws IOException {
+    // Since this is a terminal operator, update counters explicitly -
+    // forward is not called
+    out.collect(key, value);
+    if (++outputRows % 1000 == 0) {
+      if (counterNameToEnum != null) {
+        incrCounter(numOutputRowsCntr, outputRows);
+      }
+      increaseForward(outputRows);
+      outputRows = 0;
+    }
+  }
+
+  // evaluate value lazily
+  private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
+    if (value != null) {
+      return value;
+    }
+    // Evaluate the value
+    for (int i = 0; i < valueEval.length; i++) {
+      cachedValues[i] = valueEval[i].evaluate(row);
+    }
+    // Serialize the value
+    return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    if (!abort && reducerHash != null) {
+      try {
+        reducerHash.flush();
+      } catch (IOException e) {
+        throw new HiveException(e);
+      } finally {
+        reducerHash = null;
+      }
+    }
+    reducerHash = null;
+    super.closeOp(abort);
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Wed Aug 28 14:58:17 2013
@@ -124,4 +124,9 @@ public class SelectOperator extends Oper
   public boolean supportUnionRemoveOptimization() {
     return true;
   }
+
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Wed Aug 28 14:58:17 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.OutputCollector;
+
+/**
+ * Stores binary key/value in sorted manner to get top-n key/value
+ */
+abstract class TopNHash {
+
+  /**
+   * For interaction between operator and top-n hash.
+   * Currently only used to forward key/values stored in hash.
+   */
+  public static interface BinaryCollector extends OutputCollector<BytesWritable, BytesWritable> {
+  }
+
+  protected static final int FORWARD = -1;
+  protected static final int EXCLUDED = -2;
+  protected static final int FLUSH = -3;
+  protected static final int DISABLE = -4;
+
+  protected final int topN;
+  protected final BinaryCollector collector;
+
+  protected final long threshold;   // max heap size
+  protected long usage;             // heap usage (not exact)
+
+  // binary keys, binary values and hashcodes of keys, lined up by index
+  protected final byte[][] keys;
+  protected final byte[][] values;
+  protected final int[] hashes;
+
+  protected int evicted;    // recetly evicted index (the biggest one. used for next key/value)
+  protected int excluded;   // count of excluded rows from previous flush
+
+  protected final Comparator<Integer> C = new Comparator<Integer>() {
+    public int compare(Integer o1, Integer o2) {
+      byte[] key1 = keys[o1];
+      byte[] key2 = keys[o2];
+      return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length);
+    }
+  };
+
+  public static TopNHash create0() {
+    return new HashForLimit0();
+  }
+
+  public static TopNHash create(boolean grouped, int topN, long threshold,
+      BinaryCollector collector) {
+    if (topN == 0) {
+      return new HashForLimit0();
+    }
+    if (grouped) {
+      return new HashForGroup(topN, threshold, collector);
+    }
+    return new HashForRow(topN, threshold, collector);
+  }
+
+  TopNHash(int topN, long threshold, BinaryCollector collector) {
+    this.topN = topN;
+    this.threshold = threshold;
+    this.collector = collector;
+    this.keys = new byte[topN + 1][];
+    this.values = new byte[topN + 1][];
+    this.hashes = new int[topN + 1];
+    this.evicted = topN;
+  }
+
+  /**
+   * returns index for key/value/hashcode if it's acceptable.
+   * -1, -2, -3, -4 can be returned for other actions.
+   * <p/>
+   * -1 for FORWARD   : should be forwarded to output collector (for GBY)
+   * -2 for EXCLUDED  : not in top-k. ignore it
+   * -3 for FLUSH     : memory is not enough. flush values (keep keys only)
+   * -4 for DISABLE   : hash is not effective. flush and disable it
+   */
+  public int indexOf(HiveKey key) {
+    int size = size();
+    if (usage > threshold) {
+      return excluded == 0 ? DISABLE : FLUSH;
+    }
+    int index = size < topN ? size : evicted;
+    keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    hashes[index] = key.hashCode();
+    if (!store(index)) {
+      // it's only for GBY which should forward all values associated with the key in the range
+      // of limit. new value should be attatched with the key but in current implementation,
+      // only one values is allowed. with map-aggreagtion which is true by default,
+      // this is not common case, so just forward new key/value and forget that (todo)
+      return FORWARD;
+    }
+    if (size == topN) {
+      evicted = removeBiggest();  // remove the biggest key
+      if (index == evicted) {
+        excluded++;
+        return EXCLUDED;          // input key is bigger than any of keys in hash
+      }
+      removed(evicted);
+    }
+    return index;
+  }
+
+  protected abstract int size();
+
+  protected abstract boolean store(int index);
+
+  protected abstract int removeBiggest();
+
+  protected abstract Iterable<Integer> indexes();
+
+  // key/value of the index is removed. retrieve memory usage
+  public void removed(int index) {
+    usage -= keys[index].length;
+    keys[index] = null;
+    if (values[index] != null) {
+      // value can be null if hash is flushed, which only keeps keys for limiting rows
+      usage -= values[index].length;
+      values[index] = null;
+    }
+    hashes[index] = -1;
+  }
+
+  public void set(int index, BytesWritable value) {
+    values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
+    usage += keys[index].length + values[index].length;
+  }
+
+  public void flush() throws IOException {
+    for (int index : indexes()) {
+      flush(index);
+    }
+    excluded = 0;
+  }
+
+  protected void flush(int index) throws IOException {
+    if (index != evicted && values[index] != null) {
+      // BytesWritable copies array for set method. So just creats new one
+      HiveKey keyWritable = new HiveKey(keys[index], hashes[index]);
+      BytesWritable valueWritable = new BytesWritable(values[index]);
+      collector.collect(keyWritable, valueWritable);
+      usage -= values[index].length;
+      values[index] = null;
+    }
+  }
+}
+
+/**
+ * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
+ * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
+ */
+class HashForRow extends TopNHash {
+
+  private final MinMaxPriorityQueue<Integer> indexes;
+
+  HashForRow(int topN, long threshold, BinaryCollector collector) {
+    super(topN, threshold, collector);
+    this.indexes = MinMaxPriorityQueue.orderedBy(C).create();
+  }
+
+  protected int size() {
+    return indexes.size();
+  }
+
+  // returns true always
+  protected boolean store(int index) {
+    return indexes.add(index);
+  }
+
+  protected int removeBiggest() {
+    return indexes.removeLast();
+  }
+
+  protected Iterable<Integer> indexes() {
+    Integer[] array = indexes.toArray(new Integer[indexes.size()]);
+    Arrays.sort(array, 0, array.length, C);
+    return Arrays.asList(array);
+  }
+}
+
+/**
+ * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
+ * simple TreeMap is used because group by does not need keep duplicated keys
+ */
+class HashForGroup extends TopNHash {
+
+  private final SortedSet<Integer> indexes;
+
+  HashForGroup(int topN, long threshold, BinaryCollector collector) {
+    super(topN, threshold, collector);
+    this.indexes = new TreeSet<Integer>(C);
+  }
+
+  protected int size() {
+    return indexes.size();
+  }
+
+  // returns false if index already exists in map
+  protected boolean store(int index) {
+    return indexes.add(index);
+  }
+
+  protected int removeBiggest() {
+    Integer last = indexes.last();
+    indexes.remove(last);
+    return last;
+  }
+
+  protected Iterable<Integer> indexes() {
+    return indexes;
+  }
+}
+
+class HashForLimit0 extends TopNHash {
+
+  HashForLimit0() {
+    super(0, 0, null);
+  }
+
+  @Override
+  public int indexOf(HiveKey key) {
+    return EXCLUDED;
+  }
+
+  protected int size() { return 0; }
+  protected boolean store(int index) { return false; }
+  protected int removeBiggest() { return 0; }
+  protected Iterable<Integer> indexes() { return Collections.emptyList(); }
+}
+

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Wed Aug 28 14:58:17 2013
@@ -35,6 +35,12 @@ public class HiveKey extends BytesWritab
     hashCodeValid = false;
   }
 
+  public HiveKey(byte[] bytes, int hashcode) {
+    super(bytes);
+    myHashCode = hashcode;
+    hashCodeValid = true;
+  }
+
   protected int myHashCode;
 
   public void setHashCode(int myHashCode) {

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java Wed Aug 28 14:58:17 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Make RS calculate top-K selection for limit clause.
+ * It's only works with RS for limit operation which means between RS and LITMIT,
+ * there should not be other operators which may change number of rows like FilterOperator.
+ * see {@link Operator#acceptLimitPushdown}
+ *
+ * If RS is only for limiting rows, RSHash counts row with same key separately.
+ * But if RS is for GBY, RSHash should forward all the rows with the same key.
+ *
+ * Legend : A(a) --> key A, value a, row A(a)
+ *
+ * If each RS in mapper tasks is forwarded rows like this
+ *
+ * MAP1(RS) : 40(a)-10(b)-30(c)-10(d)-70(e)-80(f)
+ * MAP2(RS) : 90(g)-80(h)-60(i)-40(j)-30(k)-20(l)
+ * MAP3(RS) : 40(m)-50(n)-30(o)-30(p)-60(q)-70(r)
+ *
+ * OBY or GBY makes result like this,
+ *
+ * REDUCER : 10(b,d)-20(l)-30(c,k,o,p)-40(a,j,m)-50(n)-60(i,q)-70(e,r)-80(f,h)-90(g)
+ * LIMIT 3 for GBY: 10(b,d)-20(l)-30(c,k,o,p)
+ * LIMIT 3 for OBY: 10(b,d)-20(l)
+ *
+ * with the optimization, the amount of shuffling can be reduced, making identical result
+ *
+ * For GBY,
+ *
+ * MAP1 : 40(a)-10(b)-30(c)-10(d)
+ * MAP2 : 40(j)-30(k)-20(l)
+ * MAP3 : 40(m)-50(n)-30(o)-30(p)
+ *
+ * REDUCER : 10(b,d)-20(l)-30(c,k,o,p)-40(a,j,m)-50(n)
+ * LIMIT 3 : 10(b,d)-20(l)-30(c,k,o,p)
+ *
+ * For OBY,
+ *
+ * MAP1 : 10(b)-30(c)-10(d)
+ * MAP2 : 40(j)-30(k)-20(l)
+ * MAP3 : 40(m)-50(n)-30(o)
+ *
+ * REDUCER : 10(b,d)-20(l)-30(c,k,o)-40(j,m)-50(n)
+ * LIMIT 3 : 10(b,d)-20(l)
+ */
+public class LimitPushdownOptimizer implements Transform {
+
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("R1",
+        ReduceSinkOperator.getOperatorName() + "%" +
+        ".*" +
+        LimitOperator.getOperatorName() + "%"),
+        new TopNReducer());
+
+    LimitPushdownContext context = new LimitPushdownContext(pctx.getConf());
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    List<Node> topNodes = new ArrayList<Node>(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  private static class TopNReducer implements NodeProcessor {
+
+    public Object process(Node nd, Stack<Node> stack,
+        NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+      ReduceSinkOperator rs = null;
+      for (int i = stack.size() - 2 ; i >= 0; i--) {
+        Operator<?> operator = (Operator<?>) stack.get(i);
+        if (operator.getNumChild() != 1) {
+          return false; // multi-GBY single-RS (TODO)
+        }
+        if (operator instanceof ReduceSinkOperator) {
+          rs = (ReduceSinkOperator) operator;
+          break;
+        }
+        if (!operator.acceptLimitPushdown()) {
+          return false;
+        }
+      }
+      if (rs != null) {
+        List<List<Integer>> distincts = rs.getConf().getDistinctColumnIndices();
+        if (distincts != null && distincts.size() > 1) {
+          // multi distinct case. can not sure that it's safe just by multiplying limit value
+          return false;
+        }
+        LimitOperator limit = (LimitOperator) nd;
+        rs.getConf().setTopN(limit.getConf().getLimit());
+        rs.getConf().setTopNMemoryUsage(((LimitPushdownContext) procCtx).threshold);
+        if (rs.getNumChild() == 1 && rs.getChildren().get(0) instanceof GroupByOperator) {
+          rs.getConf().setMapGroupBy(true);
+        }
+      }
+      return true;
+    }
+  }
+
+  private static class LimitPushdownContext implements NodeProcessorCtx {
+
+    private float threshold;
+
+    public LimitPushdownContext(HiveConf conf) throws SemanticException {
+      threshold = conf.getFloatVar(HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE);
+      if (threshold <= 0 || threshold >= 1) {
+        throw new SemanticException("Invalid memory usage value " + threshold +
+            " for " + HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE);
+      }
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Aug 28 14:58:17 2013
@@ -110,6 +110,9 @@ public class Optimizer {
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
       transformations.add(new CorrelationOptimizer());
     }
+    if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
+      transformations.add(new LimitPushdownOptimizer());
+    }
     transformations.add(new SimpleFetchOptimizer());  // must be called last
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Wed Aug 28 14:58:17 2013
@@ -68,6 +68,10 @@ public class ReduceSinkDesc extends Abst
 
   private int numReducers;
 
+  private int topN = -1;
+  private float topNMemoryUsage = -1;
+  private boolean mapGroupBy;  // for group-by, values with same key on top-K should be forwarded
+
   public ReduceSinkDesc() {
   }
 
@@ -178,6 +182,40 @@ public class ReduceSinkDesc extends Abst
     this.tag = tag;
   }
 
+  public int getTopN() {
+    return topN;
+  }
+
+  public void setTopN(int topN) {
+    this.topN = topN;
+  }
+
+  @Explain(displayName = "TopN")
+  public Integer getTopNExplain() {
+    return topN > 0 ? topN : null;
+  }
+
+  public float getTopNMemoryUsage() {
+    return topNMemoryUsage;
+  }
+
+  public void setTopNMemoryUsage(float topNMemoryUsage) {
+    this.topNMemoryUsage = topNMemoryUsage;
+  }
+
+  @Explain(displayName = "TopN Hash Memory Usage")
+  public Float getTopNMemoryUsageExplain() {
+    return topN > 0 && topNMemoryUsage > 0 ? topNMemoryUsage : null;
+  }
+
+  public boolean isMapGroupBy() {
+    return mapGroupBy;
+  }
+
+  public void setMapGroupBy(boolean mapGroupBy) {
+    this.mapGroupBy = mapGroupBy;
+  }
+
   /**
    * Returns the number of reducers for the map-reduce job. -1 means to decide
    * the number of reducers at runtime. This enables Hive to estimate the number

Added: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q Wed Aug 28 14:58:17 2013
@@ -0,0 +1,66 @@
+set hive.limit.pushdown.memory.usage=0.3f;
+set hive.optimize.reducededuplication.min.reducer=1;
+
+-- HIVE-3562 Some limit can be pushed down to map stage
+
+explain
+select key,value from src order by key limit 20;
+select key,value from src order by key limit 20;
+
+explain
+select key,value from src order by key desc limit 20;
+select key,value from src order by key desc limit 20;
+
+explain
+select value, sum(key + 1) as sum from src group by value limit 20;
+select value, sum(key + 1) as sum from src group by value limit 20;
+
+-- deduped RS
+explain
+select value,avg(key + 1) from src group by value order by value limit 20;
+select value,avg(key + 1) from src group by value order by value limit 20;
+
+-- distincts
+explain
+select distinct(key) from src limit 20;
+select distinct(key) from src limit 20;
+
+explain
+select key, count(distinct(key)) from src group by key limit 20;
+select key, count(distinct(key)) from src group by key limit 20;
+
+-- limit zero
+explain
+select key,value from src order by key limit 0;
+select key,value from src order by key limit 0;
+
+-- 2MR (applied to last RS)
+explain
+select value, sum(key) as sum from src group by value order by sum limit 20;
+select value, sum(key) as sum from src group by value order by sum limit 20;
+
+-- subqueries
+explain
+select * from
+(select key, count(1) from src group by key order by key limit 2) subq
+join
+(select key, count(1) from src group by key limit 3) subq2
+on subq.key=subq2.key limit 4;
+
+set hive.map.aggr=false;
+-- map aggregation disabled
+explain
+select value, sum(key) as sum from src group by value limit 20;
+select value, sum(key) as sum from src group by value limit 20;
+
+set hive.limit.pushdown.memory.usage=0.00002f;
+
+-- flush for order-by
+explain
+select key,value,value,value,value,value,value,value,value from src order by key limit 100;
+select key,value,value,value,value,value,value,value,value from src order by key limit 100;
+
+-- flush for group-by
+explain
+select sum(key) as sum from src group by concat(key,value,value,value,value,value,value,value,value,value) limit 100;
+select sum(key) as sum from src group by concat(key,value,value,value,value,value,value,value,value,value) limit 100;

Added: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q Wed Aug 28 14:58:17 2013
@@ -0,0 +1,22 @@
+set hive.limit.pushdown.memory.usage=0.3f;
+
+-- negative, RS + join
+explain select * from src a join src b on a.key=b.key limit 20;
+
+-- negative, RS + filter
+explain select value, sum(key) as sum from src group by value having sum > 100 limit 20;
+
+-- negative, RS + lateral view
+explain select key, L.* from (select * from src order by key) a lateral view explode(array(value, value)) L as v limit 10;
+
+-- negative, RS + forward + multi-groupby
+CREATE TABLE dest_2(key STRING, c1 INT);
+CREATE TABLE dest_3(key STRING, c1 INT);
+
+EXPLAIN FROM src
+INSERT OVERWRITE TABLE dest_2 SELECT value, sum(key) GROUP BY value
+INSERT OVERWRITE TABLE dest_3 SELECT value, sum(key) GROUP BY value limit 20;
+
+-- nagative, multi distinct
+explain
+select count(distinct key)+count(distinct value) from src limit 20;