You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/28 16:58:18 UTC
svn commit: r1518234 [1/2] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/a...
Author: hashutosh
Date: Wed Aug 28 14:58:17 2013
New Revision: 1518234
URL: http://svn.apache.org/r1518234
Log:
HIVE-3562 : Receipt NoAMIP41258408
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
hive/trunk/ql/src/test/results/clientpositive/limit_pushdown.q.out
hive/trunk/ql/src/test/results/clientpositive/limit_pushdown_negative.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/build.xml
hive/trunk/ql/ivy.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug 28 14:58:17 2013
@@ -525,6 +525,8 @@ public class HiveConf extends Configurat
HIVELIMITOPTLIMITFILE("hive.limit.optimize.limit.file", 10),
HIVELIMITOPTENABLE("hive.limit.optimize.enable", false),
HIVELIMITOPTMAXFETCH("hive.limit.optimize.fetch.max", 50000),
+ HIVELIMITPUSHDOWNMEMORYUSAGE("hive.limit.pushdown.memory.usage", -1f),
+
HIVEHASHTABLETHRESHOLD("hive.hashtable.initialCapacity", 100000),
HIVEHASHTABLELOADFACTOR("hive.hashtable.loadfactor", (float) 0.75),
HIVEHASHTABLEFOLLOWBYGBYMAXMEMORYUSAGE("hive.mapjoin.followby.gby.localtask.max.memory.usage", (float) 0.55),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Wed Aug 28 14:58:17 2013
@@ -431,19 +431,19 @@
<property>
<name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name>
<value>0.3</value>
- <description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description>
+ <description>Portion of total memory to be used by map-side group aggregation hash table, when this group by is followed by map join</description>
</property>
<property>
<name>hive.map.aggr.hash.force.flush.memory.threshold</name>
<value>0.9</value>
- <description>The max memory to be used by map-side grup aggregation hash table, if the memory usage is higher than this number, force to flush data</description>
+ <description>The max memory to be used by map-side group aggregation hash table, if the memory usage is higher than this number, force to flush data</description>
</property>
<property>
<name>hive.map.aggr.hash.percentmemory</name>
<value>0.5</value>
- <description>Portion of total memory to be used by map-side grup aggregation hash table</description>
+ <description>Portion of total memory to be used by map-side group aggregation hash table</description>
</property>
<property>
@@ -1584,6 +1584,12 @@
</property>
<property>
+ <name>hive.limit.pushdown.memory.usage</name>
+ <value>0.3f</value>
+ <description>The max memory to be used for hash in RS operator for top K selection.</description>
+</property>
+
+<property>
<name>hive.rework.mapredwork</name>
<value>false</value>
<description>should rework the mapred work or not.
Modified: hive/trunk/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/build.xml?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/build.xml (original)
+++ hive/trunk/ql/build.xml Wed Aug 28 14:58:17 2013
@@ -250,15 +250,24 @@
<exclude name="META-INF/MANIFEST.MF"/>
</patternset>
</unzip>
- <unzip
- src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar"
+ <unzip
+ src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar"
dest="${build.dir.hive}/protobuf-java/classes">
<patternset>
<exclude name="META-INF"/>
<exclude name="META-INF/MANIFEST.MF"/>
</patternset>
</unzip>
- <unzip
+ <unzip
+ src="${build.ivy.lib.dir}/default/guava-${guava.version}.jar"
+ dest="${build.dir.hive}/guava/classes">
+ <patternset>
+ <exclude name="META-INF"/>
+ <exclude name="META-INF/MANIFEST.MF"/>
+ </patternset>
+ </unzip>
+
+ <unzip
src="${build.ivy.lib.dir}/default/snappy-${snappy.version}.jar"
dest="${build.dir.hive}/snappy/classes">
<patternset>
@@ -296,14 +305,11 @@
<fileset dir="${build.dir.hive}/shims/classes" includes="**/*.class"/>
<fileset dir="${build.dir.hive}/javaewah/classes" includes="**/*.class"/>
<fileset dir="${build.dir.hive}/javolution/classes" includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/protobuf-java/classes"
- includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/snappy/classes"
- includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/jackson-core-asl/classes"
- includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes"
- includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/protobuf-java/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/snappy/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/jackson-core-asl/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/guava/classes" includes="**/*.class"/>
<manifest>
<!-- Not putting these in their own manifest section, since that inserts
a new-line, which breaks the reading of the attributes. -->
Modified: hive/trunk/ql/ivy.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/ivy.xml?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/ivy.xml (original)
+++ hive/trunk/ql/ivy.xml Wed Aug 28 14:58:17 2013
@@ -47,7 +47,6 @@
<dependency org="org.iq80.snappy" name="snappy"
rev="${snappy.version}" transitive="false"/>
- <!-- hadoop specific guava -->
<dependency org="org.json" name="json" rev="${json.version}"/>
<dependency org="commons-collections" name="commons-collections" rev="${commons-collections.version}"/>
<dependency org="commons-configuration" name="commons-configuration" rev="${commons-configuration.version}"
@@ -80,6 +79,8 @@
<exclude org="org.apache.commons" module="commons-daemon"/><!--bad POM-->
</dependency>
+ <dependency org="com.google.guava" name="guava" rev="${guava.version}" transitive="false"/>
+
<!-- Test Dependencies -->
<dependency org="junit" name="junit" rev="${junit.version}" conf="test->default" />
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Wed Aug 28 14:58:17 2013
@@ -50,6 +50,11 @@ public class ExtractOperator extends Ope
return OperatorType.EXTRACT;
}
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Wed Aug 28 14:58:17 2013
@@ -41,6 +41,11 @@ public class ForwardOperator extends Ope
return OperatorType.FORWARD;
}
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Wed Aug 28 14:58:17 2013
@@ -1181,4 +1181,15 @@ public class GroupByOperator extends Ope
public OperatorType getType() {
return OperatorType.GROUPBY;
}
+
+ /**
+ * we can push the limit above GBY (running in Reducer), since that will generate single row
+ * for each group. This doesn't necessarily hold for GBY (running in Mappers),
+ * so we don't push limit above it.
+ */
+ @Override
+ public boolean acceptLimitPushdown() {
+ return getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL ||
+ getConf().getMode() == GroupByDesc.Mode.COMPLETE;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Aug 28 14:58:17 2013
@@ -592,6 +592,9 @@ public abstract class Operator<T extends
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
+ // call the operator specific close routine
+ closeOp(abort);
+
if (counterNameToEnum != null) {
incrCounter(numInputRowsCntr, inputRows);
incrCounter(numOutputRowsCntr, outputRows);
@@ -600,9 +603,6 @@ public abstract class Operator<T extends
LOG.info(id + " forwarded " + cntr + " rows");
- // call the operator specific close routine
- closeOp(abort);
-
try {
logStats();
if (childOperators == null) {
@@ -816,13 +816,7 @@ public abstract class Operator<T extends
}
}
- if (isLogInfoEnabled) {
- cntr++;
- if (cntr == nextCntr) {
- LOG.info(id + " forwarding " + cntr + " rows");
- nextCntr = getNextCntr(cntr);
- }
- }
+ increaseForward(1);
// For debugging purposes:
// System.out.println("" + this.getClass() + ": " +
@@ -855,6 +849,18 @@ public abstract class Operator<T extends
}
}
+ void increaseForward(long counter) {
+ if (isLogInfoEnabled) {
+ cntr += counter;
+ if (cntr >= nextCntr) {
+ LOG.info(id + " forwarding " + cntr + " rows");
+ do {
+ nextCntr = getNextCntr(nextCntr);
+ } while(cntr >= nextCntr);
+ }
+ }
+ }
+
public void resetStats() {
for (Enum<?> e : statsMap.keySet()) {
statsMap.get(e).set(0L);
@@ -1496,6 +1502,17 @@ public abstract class Operator<T extends
return true;
}
+ /**
+ * used for LimitPushdownOptimizer
+ *
+ * if all of the operators between limit and reduce-sink does not remove any input rows
+ * in the range of limit count, limit can be pushed down to reduce-sink operator.
+ * forward, select, etc.
+ */
+ public boolean acceptLimitPushdown() {
+ return false;
+ }
+
@Override
public String toString() {
return getName() + "[" + getIdentifier() + "]";
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Wed Aug 28 14:58:17 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -44,13 +43,12 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
- implements Serializable {
+ implements Serializable, TopNHash.BinaryCollector {
private static final long serialVersionUID = 1L;
@@ -90,6 +88,9 @@ public class ReduceSinkOperator extends
return inputAlias;
}
+ // picks topN K:V pairs from input. can be null
+ private transient TopNHash reducerHash;
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -131,6 +132,8 @@ public class ReduceSinkOperator extends
.newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
+ reducerHash = createTopKHash();
+
firstRow = true;
initializeChildren(hconf);
} catch (Exception e) {
@@ -139,14 +142,44 @@ public class ReduceSinkOperator extends
}
}
+ private TopNHash createTopKHash() {
+ int limit = conf.getTopN();
+ float percent = conf.getTopNMemoryUsage();
+ if (limit < 0 || percent <= 0) {
+ return null;
+ }
+ if (limit == 0) {
+ return TopNHash.create0();
+ }
+ // limit * 64 : compensation of arrays for key/value/hashcodes
+ long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
+ if (threshold < 0) {
+ return null;
+ }
+ return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
+ }
+
transient InspectableObject tempInspectableObject = new InspectableObject();
transient HiveKey keyWritable = new HiveKey();
- transient Writable value;
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
+ /**
+ * This two dimensional array holds key data and a corresponding Union object
+ * which contains the tag identifying the aggregate expression for distinct columns.
+ *
+ * If there is no distict expression, cachedKeys is simply like this.
+ * cachedKeys[0] = [col0][col1]
+ *
+ * with two distict expression, union(tag:key) is attatched for each distinct expression
+ * cachedKeys[0] = [col0][col1][0:dist1]
+ * cachedKeys[1] = [col0][col1][1:dist2]
+ *
+ * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+ * see {@link ExprNodeColumnEvaluator}
+ */
transient Object[][] cachedKeys;
transient Object[] cachedValues;
transient List<List<Integer>> distinctColIndices;
@@ -198,6 +231,7 @@ public class ReduceSinkOperator extends
}
@Override
+ @SuppressWarnings("unchecked")
public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -241,8 +275,6 @@ public class ReduceSinkOperator extends
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
}
- // Serialize the value
- value = valueSerializer.serialize(cachedValues, valueObjectInspector);
// Evaluate the keys
Object[] distributionKeys = new Object[numDistributionKeys];
@@ -267,6 +299,8 @@ public class ReduceSinkOperator extends
// no distinct key
System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
}
+
+ BytesWritable value = null;
// Serialize the keys and append the tag
for (int i = 0; i < cachedKeys.length; i++) {
if (keyIsText) {
@@ -294,26 +328,85 @@ public class ReduceSinkOperator extends
}
}
keyWritable.setHashCode(keyHashCode);
- if (out != null) {
- out.collect(keyWritable, value);
- // Since this is a terminal operator, update counters explicitly -
- // forward is not called
- if (counterNameToEnum != null) {
- ++outputRows;
- if (outputRows % 1000 == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
+
+ if (reducerHash == null) {
+ if (null != out) {
+ collect(keyWritable, value = getValue(row, value));
+ }
+ } else {
+ int index = reducerHash.indexOf(keyWritable);
+ if (index == TopNHash.EXCLUDED) {
+ continue;
+ }
+ value = getValue(row, value);
+ if (index >= 0) {
+ reducerHash.set(index, value);
+ } else {
+ if (index == TopNHash.FORWARD) {
+ collect(keyWritable, value);
+ } else if (index == TopNHash.FLUSH) {
+ LOG.info("Top-N hash is flushed");
+ reducerHash.flush();
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ collect(keyWritable, value);
+ } else if (index == TopNHash.DISABLE) {
+ LOG.info("Top-N hash is disabled");
+ reducerHash.flush();
+ collect(keyWritable, value);
+ reducerHash = null;
}
}
}
}
- } catch (SerDeException e) {
- throw new HiveException(e);
- } catch (IOException e) {
+ } catch (HiveException e) {
+ throw e;
+ } catch (Exception e) {
throw new HiveException(e);
}
}
+ public void collect(BytesWritable key, BytesWritable value) throws IOException {
+ // Since this is a terminal operator, update counters explicitly -
+ // forward is not called
+ out.collect(key, value);
+ if (++outputRows % 1000 == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ }
+ increaseForward(outputRows);
+ outputRows = 0;
+ }
+ }
+
+ // evaluate value lazily
+ private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
+ if (value != null) {
+ return value;
+ }
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ if (!abort && reducerHash != null) {
+ try {
+ reducerHash.flush();
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } finally {
+ reducerHash = null;
+ }
+ }
+ reducerHash = null;
+ super.closeOp(abort);
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Wed Aug 28 14:58:17 2013
@@ -124,4 +124,9 @@ public class SelectOperator extends Oper
public boolean supportUnionRemoveOptimization() {
return true;
}
+
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Wed Aug 28 14:58:17 2013
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.OutputCollector;
+
+/**
+ * Stores binary key/value in sorted manner to get top-n key/value
+ */
+abstract class TopNHash {
+
+ /**
+ * For interaction between operator and top-n hash.
+ * Currently only used to forward key/values stored in hash.
+ */
+ public static interface BinaryCollector extends OutputCollector<BytesWritable, BytesWritable> {
+ }
+
+ protected static final int FORWARD = -1;
+ protected static final int EXCLUDED = -2;
+ protected static final int FLUSH = -3;
+ protected static final int DISABLE = -4;
+
+ protected final int topN;
+ protected final BinaryCollector collector;
+
+ protected final long threshold; // max heap size
+ protected long usage; // heap usage (not exact)
+
+ // binary keys, binary values and hashcodes of keys, lined up by index
+ protected final byte[][] keys;
+ protected final byte[][] values;
+ protected final int[] hashes;
+
+ protected int evicted; // recetly evicted index (the biggest one. used for next key/value)
+ protected int excluded; // count of excluded rows from previous flush
+
+ protected final Comparator<Integer> C = new Comparator<Integer>() {
+ public int compare(Integer o1, Integer o2) {
+ byte[] key1 = keys[o1];
+ byte[] key2 = keys[o2];
+ return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length);
+ }
+ };
+
+ public static TopNHash create0() {
+ return new HashForLimit0();
+ }
+
+ public static TopNHash create(boolean grouped, int topN, long threshold,
+ BinaryCollector collector) {
+ if (topN == 0) {
+ return new HashForLimit0();
+ }
+ if (grouped) {
+ return new HashForGroup(topN, threshold, collector);
+ }
+ return new HashForRow(topN, threshold, collector);
+ }
+
+ TopNHash(int topN, long threshold, BinaryCollector collector) {
+ this.topN = topN;
+ this.threshold = threshold;
+ this.collector = collector;
+ this.keys = new byte[topN + 1][];
+ this.values = new byte[topN + 1][];
+ this.hashes = new int[topN + 1];
+ this.evicted = topN;
+ }
+
+ /**
+ * returns index for key/value/hashcode if it's acceptable.
+ * -1, -2, -3, -4 can be returned for other actions.
+ * <p/>
+ * -1 for FORWARD : should be forwarded to output collector (for GBY)
+ * -2 for EXCLUDED : not in top-k. ignore it
+ * -3 for FLUSH : memory is not enough. flush values (keep keys only)
+ * -4 for DISABLE : hash is not effective. flush and disable it
+ */
+ public int indexOf(HiveKey key) {
+ int size = size();
+ if (usage > threshold) {
+ return excluded == 0 ? DISABLE : FLUSH;
+ }
+ int index = size < topN ? size : evicted;
+ keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ hashes[index] = key.hashCode();
+ if (!store(index)) {
+ // it's only for GBY which should forward all values associated with the key in the range
+ // of limit. new value should be attatched with the key but in current implementation,
+ // only one values is allowed. with map-aggreagtion which is true by default,
+ // this is not common case, so just forward new key/value and forget that (todo)
+ return FORWARD;
+ }
+ if (size == topN) {
+ evicted = removeBiggest(); // remove the biggest key
+ if (index == evicted) {
+ excluded++;
+ return EXCLUDED; // input key is bigger than any of keys in hash
+ }
+ removed(evicted);
+ }
+ return index;
+ }
+
+ protected abstract int size();
+
+ protected abstract boolean store(int index);
+
+ protected abstract int removeBiggest();
+
+ protected abstract Iterable<Integer> indexes();
+
+ // key/value of the index is removed. retrieve memory usage
+ public void removed(int index) {
+ usage -= keys[index].length;
+ keys[index] = null;
+ if (values[index] != null) {
+ // value can be null if hash is flushed, which only keeps keys for limiting rows
+ usage -= values[index].length;
+ values[index] = null;
+ }
+ hashes[index] = -1;
+ }
+
+ public void set(int index, BytesWritable value) {
+ values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
+ usage += keys[index].length + values[index].length;
+ }
+
+ public void flush() throws IOException {
+ for (int index : indexes()) {
+ flush(index);
+ }
+ excluded = 0;
+ }
+
+ protected void flush(int index) throws IOException {
+ if (index != evicted && values[index] != null) {
+ // BytesWritable copies array for set method. So just creats new one
+ HiveKey keyWritable = new HiveKey(keys[index], hashes[index]);
+ BytesWritable valueWritable = new BytesWritable(values[index]);
+ collector.collect(keyWritable, valueWritable);
+ usage -= values[index].length;
+ values[index] = null;
+ }
+ }
+}
+
+/**
+ * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
+ * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
+ */
+class HashForRow extends TopNHash {
+
+ private final MinMaxPriorityQueue<Integer> indexes;
+
+ HashForRow(int topN, long threshold, BinaryCollector collector) {
+ super(topN, threshold, collector);
+ this.indexes = MinMaxPriorityQueue.orderedBy(C).create();
+ }
+
+ protected int size() {
+ return indexes.size();
+ }
+
+ // returns true always
+ protected boolean store(int index) {
+ return indexes.add(index);
+ }
+
+ protected int removeBiggest() {
+ return indexes.removeLast();
+ }
+
+ protected Iterable<Integer> indexes() {
+ Integer[] array = indexes.toArray(new Integer[indexes.size()]);
+ Arrays.sort(array, 0, array.length, C);
+ return Arrays.asList(array);
+ }
+}
+
+/**
+ * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
+ * simple TreeMap is used because group by does not need keep duplicated keys
+ */
+class HashForGroup extends TopNHash {
+
+ private final SortedSet<Integer> indexes;
+
+ HashForGroup(int topN, long threshold, BinaryCollector collector) {
+ super(topN, threshold, collector);
+ this.indexes = new TreeSet<Integer>(C);
+ }
+
+ protected int size() {
+ return indexes.size();
+ }
+
+ // returns false if index already exists in map
+ protected boolean store(int index) {
+ return indexes.add(index);
+ }
+
+ protected int removeBiggest() {
+ Integer last = indexes.last();
+ indexes.remove(last);
+ return last;
+ }
+
+ protected Iterable<Integer> indexes() {
+ return indexes;
+ }
+}
+
+class HashForLimit0 extends TopNHash {
+
+ HashForLimit0() {
+ super(0, 0, null);
+ }
+
+ @Override
+ public int indexOf(HiveKey key) {
+ return EXCLUDED;
+ }
+
+ protected int size() { return 0; }
+ protected boolean store(int index) { return false; }
+ protected int removeBiggest() { return 0; }
+ protected Iterable<Integer> indexes() { return Collections.emptyList(); }
+}
+
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Wed Aug 28 14:58:17 2013
@@ -35,6 +35,12 @@ public class HiveKey extends BytesWritab
hashCodeValid = false;
}
+ public HiveKey(byte[] bytes, int hashcode) {
+ super(bytes);
+ myHashCode = hashcode;
+ hashCodeValid = true;
+ }
+
protected int myHashCode;
public void setHashCode(int myHashCode) {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java Wed Aug 28 14:58:17 2013
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * Make RS calculate top-K selection for limit clause.
+ * It's only works with RS for limit operation which means between RS and LITMIT,
+ * there should not be other operators which may change number of rows like FilterOperator.
+ * see {@link Operator#acceptLimitPushdown}
+ *
+ * If RS is only for limiting rows, RSHash counts row with same key separately.
+ * But if RS is for GBY, RSHash should forward all the rows with the same key.
+ *
+ * Legend : A(a) --> key A, value a, row A(a)
+ *
+ * If each RS in mapper tasks is forwarded rows like this
+ *
+ * MAP1(RS) : 40(a)-10(b)-30(c)-10(d)-70(e)-80(f)
+ * MAP2(RS) : 90(g)-80(h)-60(i)-40(j)-30(k)-20(l)
+ * MAP3(RS) : 40(m)-50(n)-30(o)-30(p)-60(q)-70(r)
+ *
+ * OBY or GBY makes result like this,
+ *
+ * REDUCER : 10(b,d)-20(l)-30(c,k,o,p)-40(a,j,m)-50(n)-60(i,q)-70(e,r)-80(f,h)-90(g)
+ * LIMIT 3 for GBY: 10(b,d)-20(l)-30(c,k,o,p)
+ * LIMIT 3 for OBY: 10(b,d)-20(l)
+ *
+ * with the optimization, the amount of shuffling can be reduced, making identical result
+ *
+ * For GBY,
+ *
+ * MAP1 : 40(a)-10(b)-30(c)-10(d)
+ * MAP2 : 40(j)-30(k)-20(l)
+ * MAP3 : 40(m)-50(n)-30(o)-30(p)
+ *
+ * REDUCER : 10(b,d)-20(l)-30(c,k,o,p)-40(a,j,m)-50(n)
+ * LIMIT 3 : 10(b,d)-20(l)-30(c,k,o,p)
+ *
+ * For OBY,
+ *
+ * MAP1 : 10(b)-30(c)-10(d)
+ * MAP2 : 40(j)-30(k)-20(l)
+ * MAP3 : 40(m)-50(n)-30(o)
+ *
+ * REDUCER : 10(b,d)-20(l)-30(c,k,o)-40(j,m)-50(n)
+ * LIMIT 3 : 10(b,d)-20(l)
+ */
+public class LimitPushdownOptimizer implements Transform {
+
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("R1",
+ ReduceSinkOperator.getOperatorName() + "%" +
+ ".*" +
+ LimitOperator.getOperatorName() + "%"),
+ new TopNReducer());
+
+ LimitPushdownContext context = new LimitPushdownContext(pctx.getConf());
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, context);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ List<Node> topNodes = new ArrayList<Node>(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ return pctx;
+ }
+
+ private static class TopNReducer implements NodeProcessor {
+
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator rs = null;
+ for (int i = stack.size() - 2 ; i >= 0; i--) {
+ Operator<?> operator = (Operator<?>) stack.get(i);
+ if (operator.getNumChild() != 1) {
+ return false; // multi-GBY single-RS (TODO)
+ }
+ if (operator instanceof ReduceSinkOperator) {
+ rs = (ReduceSinkOperator) operator;
+ break;
+ }
+ if (!operator.acceptLimitPushdown()) {
+ return false;
+ }
+ }
+ if (rs != null) {
+ List<List<Integer>> distincts = rs.getConf().getDistinctColumnIndices();
+ if (distincts != null && distincts.size() > 1) {
+ // multi distinct case. can not sure that it's safe just by multiplying limit value
+ return false;
+ }
+ LimitOperator limit = (LimitOperator) nd;
+ rs.getConf().setTopN(limit.getConf().getLimit());
+ rs.getConf().setTopNMemoryUsage(((LimitPushdownContext) procCtx).threshold);
+ if (rs.getNumChild() == 1 && rs.getChildren().get(0) instanceof GroupByOperator) {
+ rs.getConf().setMapGroupBy(true);
+ }
+ }
+ return true;
+ }
+ }
+
+ private static class LimitPushdownContext implements NodeProcessorCtx {
+
+ private float threshold;
+
+ public LimitPushdownContext(HiveConf conf) throws SemanticException {
+ threshold = conf.getFloatVar(HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE);
+ if (threshold <= 0 || threshold >= 1) {
+ throw new SemanticException("Invalid memory usage value " + threshold +
+ " for " + HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE);
+ }
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Aug 28 14:58:17 2013
@@ -110,6 +110,9 @@ public class Optimizer {
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
transformations.add(new CorrelationOptimizer());
}
+ if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
+ transformations.add(new LimitPushdownOptimizer());
+ }
transformations.add(new SimpleFetchOptimizer()); // must be called last
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1518234&r1=1518233&r2=1518234&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Wed Aug 28 14:58:17 2013
@@ -68,6 +68,10 @@ public class ReduceSinkDesc extends Abst
private int numReducers;
+ private int topN = -1;
+ private float topNMemoryUsage = -1;
+ private boolean mapGroupBy; // for group-by, values with same key on top-K should be forwarded
+
public ReduceSinkDesc() {
}
@@ -178,6 +182,40 @@ public class ReduceSinkDesc extends Abst
this.tag = tag;
}
+ public int getTopN() {
+ return topN;
+ }
+
+ public void setTopN(int topN) {
+ this.topN = topN;
+ }
+
+ @Explain(displayName = "TopN")
+ public Integer getTopNExplain() {
+ return topN > 0 ? topN : null;
+ }
+
+ public float getTopNMemoryUsage() {
+ return topNMemoryUsage;
+ }
+
+ public void setTopNMemoryUsage(float topNMemoryUsage) {
+ this.topNMemoryUsage = topNMemoryUsage;
+ }
+
+ @Explain(displayName = "TopN Hash Memory Usage")
+ public Float getTopNMemoryUsageExplain() {
+ return topN > 0 && topNMemoryUsage > 0 ? topNMemoryUsage : null;
+ }
+
+ public boolean isMapGroupBy() {
+ return mapGroupBy;
+ }
+
+ public void setMapGroupBy(boolean mapGroupBy) {
+ this.mapGroupBy = mapGroupBy;
+ }
+
/**
* Returns the number of reducers for the map-reduce job. -1 means to decide
* the number of reducers at runtime. This enables Hive to estimate the number
Added: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown.q Wed Aug 28 14:58:17 2013
@@ -0,0 +1,66 @@
+set hive.limit.pushdown.memory.usage=0.3f;
+set hive.optimize.reducededuplication.min.reducer=1;
+
+-- HIVE-3562 Some limit can be pushed down to map stage
+
+explain
+select key,value from src order by key limit 20;
+select key,value from src order by key limit 20;
+
+explain
+select key,value from src order by key desc limit 20;
+select key,value from src order by key desc limit 20;
+
+explain
+select value, sum(key + 1) as sum from src group by value limit 20;
+select value, sum(key + 1) as sum from src group by value limit 20;
+
+-- deduped RS
+explain
+select value,avg(key + 1) from src group by value order by value limit 20;
+select value,avg(key + 1) from src group by value order by value limit 20;
+
+-- distincts
+explain
+select distinct(key) from src limit 20;
+select distinct(key) from src limit 20;
+
+explain
+select key, count(distinct(key)) from src group by key limit 20;
+select key, count(distinct(key)) from src group by key limit 20;
+
+-- limit zero
+explain
+select key,value from src order by key limit 0;
+select key,value from src order by key limit 0;
+
+-- 2MR (applied to last RS)
+explain
+select value, sum(key) as sum from src group by value order by sum limit 20;
+select value, sum(key) as sum from src group by value order by sum limit 20;
+
+-- subqueries
+explain
+select * from
+(select key, count(1) from src group by key order by key limit 2) subq
+join
+(select key, count(1) from src group by key limit 3) subq2
+on subq.key=subq2.key limit 4;
+
+set hive.map.aggr=false;
+-- map aggregation disabled
+explain
+select value, sum(key) as sum from src group by value limit 20;
+select value, sum(key) as sum from src group by value limit 20;
+
+set hive.limit.pushdown.memory.usage=0.00002f;
+
+-- flush for order-by
+explain
+select key,value,value,value,value,value,value,value,value from src order by key limit 100;
+select key,value,value,value,value,value,value,value,value from src order by key limit 100;
+
+-- flush for group-by
+explain
+select sum(key) as sum from src group by concat(key,value,value,value,value,value,value,value,value,value) limit 100;
+select sum(key) as sum from src group by concat(key,value,value,value,value,value,value,value,value,value) limit 100;
Added: hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q?rev=1518234&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/limit_pushdown_negative.q Wed Aug 28 14:58:17 2013
@@ -0,0 +1,22 @@
+set hive.limit.pushdown.memory.usage=0.3f;
+
+-- negative, RS + join
+explain select * from src a join src b on a.key=b.key limit 20;
+
+-- negative, RS + filter
+explain select value, sum(key) as sum from src group by value having sum > 100 limit 20;
+
+-- negative, RS + lateral view
+explain select key, L.* from (select * from src order by key) a lateral view explode(array(value, value)) L as v limit 10;
+
+-- negative, RS + forward + multi-groupby
+CREATE TABLE dest_2(key STRING, c1 INT);
+CREATE TABLE dest_3(key STRING, c1 INT);
+
+EXPLAIN FROM src
+INSERT OVERWRITE TABLE dest_2 SELECT value, sum(key) GROUP BY value
+INSERT OVERWRITE TABLE dest_3 SELECT value, sum(key) GROUP BY value limit 20;
+
+-- nagative, multi distinct
+explain
+select count(distinct key)+count(distinct value) from src limit 20;