You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2018/08/17 13:34:44 UTC

hive git commit: HIVE-20368: Remove VectorTopNKeyOperator lock (Teddy Choi, reviewed by Jesus Camacho Rodriguez)

Repository: hive
Updated Branches:
  refs/heads/master ccdcc5e2e -> 513ee73b7


HIVE-20368: Remove VectorTopNKeyOperator lock (Teddy Choi, reviewed by Jesus Camacho Rodriguez)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/513ee73b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/513ee73b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/513ee73b

Branch: refs/heads/master
Commit: 513ee73b77a86c036fbcc424bfc5c70da817c98b
Parents: ccdcc5e
Author: Teddy Choi <pu...@gmail.com>
Authored: Fri Aug 17 22:31:09 2018 +0900
Committer: Teddy Choi <pu...@gmail.com>
Committed: Fri Aug 17 22:31:09 2018 +0900

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/TopNKeyOperator.java    |  62 ++-----
 .../ql/exec/vector/VectorTopNKeyOperator.java   | 163 +------------------
 2 files changed, 23 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/513ee73b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
index 3dfeeaf..4734824 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNKeyOperator.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
@@ -47,21 +46,8 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
   // Priority queue that holds occurred keys
   private transient PriorityQueue<KeyWrapper> priorityQueue;
 
-  // Fast key wrapper in input format for fast comparison
   private transient KeyWrapper keyWrapper;
 
-  // Standard key wrapper in standard format for output
-  private transient KeyWrapper standardKeyWrapper;
-
-  // Maximum number of rows
-  private transient int rowLimit;
-
-  // Current number of rows
-  private transient int rowSize;
-
-  // Rows
-  private transient Object[] rows;
-
   /** Kryo ctor. */
   public TopNKeyOperator() {
     super();
@@ -103,7 +89,8 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
     }
 
     ObjectInspector rowInspector = inputObjInspectors[0];
-    outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+    ObjectInspector standardObjInspector = ObjectInspectorUtils.getStandardObjectInspector(rowInspector);
+    outputObjInspector = rowInspector;
 
     // init keyFields
     int numKeys = conf.getKeyColumns().size();
@@ -117,25 +104,26 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
       keyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
       keyObjectInspectors[i] = keyFields[i].initialize(rowInspector);
       standardKeyFields[i] = ExprNodeEvaluatorFactory.get(key, hconf);
-      standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(outputObjInspector);
+      standardKeyObjectInspectors[i] = standardKeyFields[i].initialize(standardObjInspector);
     }
 
     priorityQueue = new PriorityQueue<>(topN + 1, new TopNKeyOperator.KeyWrapperComparator(
         standardKeyObjectInspectors, standardKeyObjectInspectors, columnSortOrderIsDesc));
 
-    keyWrapper = new KeyWrapperFactory(keyFields, keyObjectInspectors,
-        standardKeyObjectInspectors).getKeyWrapper();
-    standardKeyWrapper = new KeyWrapperFactory(standardKeyFields, standardKeyObjectInspectors,
-        standardKeyObjectInspectors).getKeyWrapper();
-
-    rowLimit = VectorizedRowBatch.DEFAULT_SIZE;
-    rows = new Object[rowLimit];
-    rowSize = 0;
+    KeyWrapperFactory keyWrapperFactory = new KeyWrapperFactory(keyFields, keyObjectInspectors,
+        standardKeyObjectInspectors);
+    keyWrapper = keyWrapperFactory.getKeyWrapper();
   }
 
   @Override
   public void process(Object row, int tag) throws HiveException {
-    keyWrapper.getNewKey(row, inputObjInspectors[0]);
+    if (canProcess(row, tag)) {
+      forward(row, outputObjInspector);
+    }
+  }
+
+  protected boolean canProcess(Object row, int tag) throws HiveException {
+    keyWrapper.getNewKey(row, inputObjInspectors[tag]);
     keyWrapper.setHashKey();
 
     if (!priorityQueue.contains(keyWrapper)) {
@@ -145,32 +133,12 @@ public class TopNKeyOperator extends Operator<TopNKeyDesc> implements Serializab
       priorityQueue.poll();
     }
 
-    rows[rowSize] = ObjectInspectorUtils.copyToStandardObject(row, inputObjInspectors[0]);
-    rowSize++;
-
-    if (rowSize % rowLimit == 0) {
-      processRows();
-    }
-  }
-
-  private void processRows() throws HiveException {
-    for (int i = 0; i < rowSize; i++) {
-      Object row = rows[i];
-
-      standardKeyWrapper.getNewKey(row, outputObjInspector);
-      standardKeyWrapper.setHashKey();
-
-      if (priorityQueue.contains(standardKeyWrapper)) {
-        forward(row, outputObjInspector);
-      }
-    }
-    priorityQueue.clear();
-    rowSize = 0;
+    return priorityQueue.contains(keyWrapper);
   }
 
   @Override
   protected final void closeOp(boolean abort) throws HiveException {
-    processRows();
+    priorityQueue.clear();
     super.closeOp(abort);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/513ee73b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
index 6f29f88..e28afee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorTopNKeyOperator.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.primitives.Ints;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -30,50 +28,23 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
 import org.apache.hadoop.hive.ql.plan.VectorDesc;
 import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.Properties;
-
-import static org.apache.hadoop.hive.ql.plan.api.OperatorType.TOPNKEY;
 
 /**
  * VectorTopNKeyOperator passes rows that contains top N keys only.
  */
-public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements VectorizationOperator {
+public class VectorTopNKeyOperator extends TopNKeyOperator implements VectorizationOperator {
 
   private static final long serialVersionUID = 1L;
 
   private VectorTopNKeyDesc vectorDesc;
   private VectorizationContext vContext;
 
-  // Key column info
-  private int[] keyColumnNums;
-  private TypeInfo[] keyTypeInfos;
-
   // Extract row
-  private transient Object[] singleRow;
+  private transient Object[] extractedRow;
   private transient VectorExtractRow vectorExtractRow;
 
-  // Serialization
-  private transient BinarySortableSerDe binarySortableSerDe;
-  private transient StructObjectInspector keyObjectInspector;
-
   // Batch processing
-  private transient boolean firstBatch;
-  private transient PriorityQueue<Writable> priorityQueue;
   private transient int[] temporarySelected;
 
   public VectorTopNKeyOperator(CompilationOpContext ctx, OperatorDesc conf,
@@ -83,16 +54,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
     this.conf = (TopNKeyDesc) conf;
     this.vContext = vContext;
     this.vectorDesc = (VectorTopNKeyDesc) vectorDesc;
-
-    VectorExpression[] keyExpressions = this.vectorDesc.getKeyExpressions();
-    final int numKeys = keyExpressions.length;
-    keyColumnNums = new int[numKeys];
-    keyTypeInfos = new TypeInfo[numKeys];
-
-    for (int i = 0; i < numKeys; i++) {
-      keyColumnNums[i] = keyExpressions[i].getOutputColumnNum();
-      keyTypeInfos[i] = keyExpressions[i].getOutputTypeInfo();
-    }
   }
 
   /** Kryo ctor. */
@@ -114,20 +75,10 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
       keyExpression.init(hconf);
     }
 
-    this.firstBatch = true;
-
-    VectorExpression[] keyExpressions = vectorDesc.getKeyExpressions();
-    final int size = keyExpressions.length;
-    ObjectInspector[] fieldObjectInspectors = new ObjectInspector[size];
-
-    for (int i = 0; i < size; i++) {
-      VectorExpression keyExpression = keyExpressions[i];
-      fieldObjectInspectors[i] = TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(
-          keyExpression.getOutputTypeInfo());
-    }
-
-    keyObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
-        this.conf.getKeyColumnNames(), Arrays.asList(fieldObjectInspectors));
+    vectorExtractRow = new VectorExtractRow();
+    vectorExtractRow.init((StructObjectInspector) inputObjInspectors[0],
+        vContext.getProjectedColumns());
+    extractedRow = new Object[vectorExtractRow.getCount()];
 
     temporarySelected = new int [VectorizedRowBatch.DEFAULT_SIZE];
   }
@@ -148,63 +99,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
       keyExpression.evaluate(batch);
     }
 
-    if (firstBatch) {
-      vectorExtractRow = new VectorExtractRow();
-      vectorExtractRow.init(keyObjectInspector, Ints.asList(keyColumnNums));
-
-      singleRow = new Object[vectorExtractRow.getCount()];
-      Comparator comparator = Comparator.reverseOrder();
-      priorityQueue = new PriorityQueue<Writable>(comparator);
-
-      try {
-        binarySortableSerDe = new BinarySortableSerDe();
-        Properties properties = new Properties();
-        Joiner joiner = Joiner.on(',');
-        properties.setProperty(serdeConstants.LIST_COLUMNS, joiner.join(conf.getKeyColumnNames()));
-        properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, joiner.join(keyTypeInfos));
-        properties.setProperty(serdeConstants.SERIALIZATION_SORT_ORDER,
-            conf.getColumnSortOrder());
-        binarySortableSerDe.initialize(getConfiguration(), properties);
-      } catch (SerDeException e) {
-        throw new HiveException(e);
-      }
-
-      firstBatch = false;
-    }
-
-    // Clear the priority queue
-    priorityQueue.clear();
-
-    // Get top n keys
-    for (int i = 0; i < batch.size; i++) {
-
-      // Get keys
-      int j;
-      if (batch.selectedInUse) {
-        j = batch.selected[i];
-      } else {
-        j = i;
-      }
-      vectorExtractRow.extractRow(batch, j, singleRow);
-
-      Writable keysWritable;
-      try {
-        keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
-      } catch (SerDeException e) {
-        throw new HiveException(e);
-      }
-
-      // Put the copied keys into the priority queue
-      if (!priorityQueue.contains(keysWritable)) {
-        priorityQueue.offer(WritableUtils.clone(keysWritable, getConfiguration()));
-      }
-
-      // Limit the queue size
-      if (priorityQueue.size() > conf.getTopN()) {
-        priorityQueue.poll();
-      }
-    }
-
     // Filter rows with top n keys
     int size = 0;
     int[] selected = new int[batch.selected.length];
@@ -217,17 +111,10 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
       }
 
       // Get keys
-      vectorExtractRow.extractRow(batch, j, singleRow);
-
-      Writable keysWritable;
-      try {
-        keysWritable = binarySortableSerDe.serialize(singleRow, keyObjectInspector);
-      } catch (SerDeException e) {
-        throw new HiveException(e);
-      }
+      vectorExtractRow.extractRow(batch, j, extractedRow);
 
       // Select a row in the priority queue
-      if (priorityQueue.contains(keysWritable)) {
+      if (canProcess(extractedRow, tag)) {
         selected[size++] = j;
       }
     }
@@ -251,16 +138,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
   }
 
   @Override
-  public String getName() {
-    return TopNKeyOperator.getOperatorName();
-  }
-
-  @Override
-  public OperatorType getType() {
-    return TOPNKEY;
-  }
-
-  @Override
   public VectorizationContext getInputVectorizationContext() {
     return vContext;
   }
@@ -270,30 +147,6 @@ public class VectorTopNKeyOperator extends Operator<TopNKeyDesc> implements Vect
     return vectorDesc;
   }
 
-  // Because a TopNKeyOperator works like a FilterOperator with top n key condition, its properties
-  // for optimizers has same values. Following methods are same with FilterOperator;
-  // supportSkewJoinOptimization, columnNamesRowResolvedCanBeObtained,
-  // supportAutomaticSortMergeJoin, and supportUnionRemoveOptimization.
-  @Override
-  public boolean supportSkewJoinOptimization() {
-    return true;
-  }
-
-  @Override
-  public boolean columnNamesRowResolvedCanBeObtained() {
-    return true;
-  }
-
-  @Override
-  public boolean supportAutomaticSortMergeJoin() {
-    return true;
-  }
-
-  @Override
-  public boolean supportUnionRemoveOptimization() {
-    return true;
-  }
-
   // Must send on to VectorPTFOperator...
   @Override
   public void setNextVectorBatchGroupStatus(boolean isLastGroupBatch) throws HiveException {