You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2018/08/17 13:34:44 UTC
hive git commit: HIVE-20368: Remove VectorTopNKeyOperator lock (Teddy
Choi, reviewed by Jesus Camacho Rodriguez)
Repository: hive
Updated Branches:
refs/heads/master ccdcc5e2e -> 513ee73b7
HIVE-20368: Remove VectorTopNKeyOperator lock (Teddy Choi, reviewed by Jesus Camacho Rodriguez)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/513ee73b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/513ee73b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/513ee73b
Branch: refs/heads/master
Commit: 513ee73b77a86c036fbcc424bfc5c70da817c98b
Parents: ccdcc5e
Author: Teddy Choi <pu...@gmail.com>
Authored: Fri Aug 17 22:31:09 2018 +0900
Committer: Teddy Choi <pu...@gmail.com>
Committed: Fri Aug 17 22:31:09 2018 +0900
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/TopNKeyOperator.java | 62 ++-----
.../ql/exec/vector/VectorTopNKeyOperator.java | 163 +------------------
2 files changed, 23 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/513ee73b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
index 3dfeeaf..4734824 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
@@ -47,21 +46,8 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
// Priority queue that holds occurred keys
private transient PriorityQueue<KeyWrapper> priorityQueue;
- // Fast key wrapper in input format for fast comparison
private transient KeyWrapper keyWrapper;
- // Standard key wrapper in standard format for output
- private transient KeyWrapper standardKeyWrapper;
-
- // Maximum number of rows
- private transient int rowLimit;
-
- // Current number of rows
- private transient int rowSize;
-
- // Rows
- private transient Object[] rows;
-
/** Kryo ctor. */
public TopNKeyOperator() {
super();
@@ -103,7 +89,8 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
}
ObjectInspector rowInspector = inputObjInspectors[0];
- outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+ ObjectInspector standardObjInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+ outputObjInspector = rowInspector;
// init keyFields
int numKeys = conf.getKeyColumns().size();
@@ -117,25 +104,26 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
keyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
standardKeyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
- standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(outputObjInspector);
+ standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(standardObjInspector);
}
priorityQueue = new PriorityQueue<>(topN + 1, new TopNKeyOperator.KeyWrapperComparator(
standardKeyObjectInspectors, standardKeyObjectInspectors, columnSortOrderIsDesc));
- keyWrapper = new KeyWrapperFactory(keyFields, keyObjectInspectors,
- standardKeyObjectInspectors).getKeyWrapper();
- standardKeyWrapper = new KeyWrapperFactory(standardKeyFields, standardKeyObjectInspectors,
- standardKeyObjectInspectors).getKeyWrapper();
-
- rowLimit = VectorizedRowBatch.DEFAULT_SIZE;
- rows = new Object[rowLimit];
- rowSize = 0;
+ KeyWrapperFactory keyWrapperFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors,
+ standardKeyObjectInspectors);
+ keyWrapper = keyWrapperFactory.getKeyWrapper();
}
@Override
public void process(Object row, int tag) throws HiveException {
- keyWrapper.getNewKey(row, inputObjInspectors[0]);
+ if (canProcess(row, tag)) {
+ forward(row, outputObjInspector);
+ }
+ }
+
+ protected boolean canProcess(Object row, int tag) throws HiveException {
+ keyWrapper.getNewKey(row, inputObjInspectors[tag]);
keyWrapper.setHashKey();
if (!priorityQueue.contains(keyWrapper)) {
@@ -145,32 +133,12 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
priorityQueue.poll();
}
- rows[rowSize] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0]);
- rowSize++;
-
- if (rowSize % rowLimit == 0) {
- processRows();
- }
- }
-
- private void processRows() throws HiveException {
- for (int i = 0; i < rowSize; i++) {
- Object row = rows[i];
-
- standardKeyWrapper.getNewKey(row, outputObjInspector);
- standardKeyWrapper.setHashKey();
-
- if (priorityQueue.contains(standardKeyWrapper)) {
- forward(row, outputObjInspector);
- }
- }
- priorityQueue.clear();
- rowSize = 0;
+ return priorityQueue.contains(keyWrapper);
}
@Override
protected final void closeOp(boolean abort) throws HiveException {
- processRows();
+ priorityQueue.clear();
super.closeOp(abort);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/513ee73b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
index 6f29f88..e28afee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.exec.vector;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -30,50 +28,23 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.VectorDesc;
import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.Properties;
-
-import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
/**
* VectorTopNKeyOperator passes rows that contains top N keys only.
*/
-public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements VectorizationOperator {
+public class VectorTopNKeyOperator extends TopNKeyOperator implements VectorizationOperator {
private static final long serialVersionUID = 1L;
private VectorTopNKeyDesc vectorDesc;
private VectorizationContext vContext;
- // Key column info
- private int[] keyColumnNums;
- private TypeInfo[] keyTypeInfos;
-
// Extract row
- private transient Object[] singleRow;
+ private transient Object[] extractedRow;
private transient VectorExtractRow vectorExtractRow;
- // Serialization
- private transient BinarySortableSerDe binarySortableSerDe;
- private transient StructObjectInspector keyObjectInspector;
-
// Batch processing
- private transient boolean firstBatch;
- private transient PriorityQueue<Writable> priorityQueue;
private transient int[] temporarySelected;
public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
@@ -83,16 +54,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
this.conf = (TopNKeyDesc) conf;
this.vContext = vContext;
this.vectorDesc = (VectorTopNKeyDesc) vectorDesc;
-
- VectorExpression[] keyExpressions = this.vectorDesc.getKeyExpressions();
- final int numKeys = keyExpressions.length;
- keyColumnNums = new int[numKeys];
- keyTypeInfos = new TypeInfo[numKeys];
-
- for (int i = 0; i < numKeys; i++) {
- keyColumnNums[i] = keyExpressions[i].getOutputColumnNum();
- keyTypeInfos[i] = keyExpressions[i].getOutputTypeInfo();
- }
}
/** Kryo ctor. */
@@ -114,20 +75,10 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
keyExpression.init(hconf);
}
- this.firstBatch = true;
-
- VectorExpression[] keyExpressions = vectorDesc.getKeyExpressions();
- final int size = keyExpressions.length;
- ObjectInspector[] fieldObjectInspectors = new ObjectInspector[size];
-
- for (int i = 0; i < size; i++) {
- VectorExpression keyExpression = keyExpressions[i];
- fieldObjectInspectors[i] = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
- keyExpression.getOutputTypeInfo());
- }
-
- keyObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
- this.conf.getKeyColumnNames(), Arrays.asList(fieldObjectInspectors));
+ vectorExtractRow = new VectorExtractRow();
+ vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0],
+ vContext.getProjectedColumns());
+ extractedRow = new Object[vectorExtractRow.getCount()];
temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
}
@@ -148,63 +99,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
keyExpression.evaluate(batch);
}
- if (firstBatch) {
- vectorExtractRow = new VectorExtractRow();
- vectorExtractRow.init(keyObjectInspector, Ints.asList(keyColumnNums));
-
- singleRow = new Object[vectorExtractRow.getCount()];
- Comparator comparator = Comparator.reverseOrder();
- priorityQueue = new PriorityQueue<Writable>(comparator);
-
- try {
- binarySortableSerDe = new BinarySortableSerDe();
- Properties properties = new Properties();
- Joiner joiner = Joiner.on(',');
- properties.setProperty(serdeConstants.LIST_COLUMNS, joiner.join(conf.getKeyColumnNames()));
- properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, joiner.join(keyTypeInfos));
- properties.setProperty(serdeConstants.SERIALIZATION_SORT_ORDER,
- conf.getColumnSortOrder());
- binarySortableSerDe.initialize(getConfiguration(), properties);
- } catch (SerDeException e) {
- throw new HiveException(e);
- }
-
- firstBatch = false;
- }
-
- // Clear the priority queue
- priorityQueue.clear();
-
- // Get top n keys
- for (int i = 0; i < batch.size; i++) {
-
- // Get keys
- int j;
- if (batch.selectedInUse) {
- j = batch.selected[i];
- } else {
- j = i;
- }
- vectorExtractRow.extractRow(batch, j, singleRow);
-
- Writable keysWritable;
- try {
- keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
- } catch (SerDeException e) {
- throw new HiveException(e);
- }
-
- // Put the copied keys into the priority queue
- if (!priorityQueue.contains(keysWritable)) {
- priorityQueue.offer(WritableUtils.clone(keysWritable, getConfiguration()));
- }
-
- // Limit the queue size
- if (priorityQueue.size() > conf.getTopN()) {
- priorityQueue.poll();
- }
- }
-
// Filter rows with top n keys
int size = 0;
int[] selected = new int[batch.selected.length];
@@ -217,17 +111,10 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
}
// Get keys
- vectorExtractRow.extractRow(batch, j, singleRow);
-
- Writable keysWritable;
- try {
- keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
- } catch (SerDeException e) {
- throw new HiveException(e);
- }
+ vectorExtractRow.extractRow(batch, j, extractedRow);
// Select a row in the priority queue
- if (priorityQueue.contains(keysWritable)) {
+ if (canProcess(extractedRow, tag)) {
selected[size++] = j;
}
}
@@ -251,16 +138,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
}
@Override
- public String getName() {
- return TopNKeyOperator.getOperatorName();
- }
-
- @Override
- public OperatorType getType() {
- return TOPNKEY;
- }
-
- @Override
public VectorizationContext getInputVectorizationContext() {
return vContext;
}
@@ -270,30 +147,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
return vectorDesc;
}
- // Because a TopNKeyOperator works like a FilterOperator with top n key condition, its properties
- // for optimizers has same values. Following methods are same with FilterOperator;
- // supportSkewJoinOptimization, columnNamesRowResolvedCanBeObtained,
- // supportAutomaticSortMergeJoin, and supportUnionRemoveOptimization.
- @Override
- public boolean supportSkewJoinOptimization() {
- return true;
- }
-
- @Override
- public boolean columnNamesRowResolvedCanBeObtained() {
- return true;
- }
-
- @Override
- public boolean supportAutomaticSortMergeJoin() {
- return true;
- }
-
- @Override
- public boolean supportUnionRemoveOptimization() {
- return true;
- }
-
// Must send on to VectorPTFOperator...
@Override
public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException {