You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/07/23 04:05:54 UTC

[hive] branch master updated (c279768 -> 51346a0)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git.


    from c279768  HIVE-23849: Hive skips the creation of ColumnAccessInfo when creating a view (Barnabas Maidics, reviewed by Peter Vary, Jesus Camacho Rodriguez)
     new 380be9a  HIVE-23843: Improve key evictions in VectorGroupByOperator (Rajesh Balamohan via Ashutosh Chauhan, Zoltan Haindrich)
     new 51346a0  HIVE-23870: Optimise multiple text conversions in WritableHiveCharObjectInspector.getPrimitiveJavaObject / HiveCharWritable (Rajesh Balamohan via Ashutosh Chauhan, David Mollitor)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ql/exec/vector/VectorAggregationBufferRow.java |  12 +-
 .../hive/ql/exec/vector/VectorGroupByOperator.java |  53 +++++++--
 .../ql/exec/vector/TestVectorGroupByOperator.java  | 125 ++++++++++++++++++---
 .../hive/serde2/io/HiveBaseCharWritable.java       |   8 +-
 .../hadoop/hive/serde2/io/HiveCharWritable.java    |  10 +-
 .../hadoop/hive/serde2/io/HiveVarcharWritable.java |   2 +
 6 files changed, 186 insertions(+), 24 deletions(-)


[hive] 02/02: HIVE-23870: Optimise multiple text conversions in WritableHiveCharObjectInspector.getPrimitiveJavaObject / HiveCharWritable (Rajesh Balamohan via Ashutosh Chauhan, David Mollitor)

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 51346a0935acfca410c4858c7d4367e27a075392
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Mon Jul 20 15:19:41 2020 +0530

    HIVE-23870: Optimise multiple text conversions in WritableHiveCharObjectInspector.getPrimitiveJavaObject / HiveCharWritable (Rajesh Balamohan via Ashutosh Chauhan, David Mollitor)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../org/apache/hadoop/hive/serde2/io/HiveBaseCharWritable.java |  8 +++++++-
 .../org/apache/hadoop/hive/serde2/io/HiveCharWritable.java     | 10 +++++++++-
 .../org/apache/hadoop/hive/serde2/io/HiveVarcharWritable.java  |  2 ++
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveBaseCharWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveBaseCharWritable.java
index 5b7b3b4..c4bd6ff 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveBaseCharWritable.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveBaseCharWritable.java
@@ -27,12 +27,17 @@ import org.apache.hive.common.util.HiveStringUtils;
 
 public abstract class HiveBaseCharWritable {
   protected Text value = new Text();
+  protected int charLength = -1;
 
   public HiveBaseCharWritable() {
   }
 
   public int getCharacterLength() {
-    return HiveStringUtils.getTextUtfLength(value);
+    if (charLength != -1) {
+      return charLength;
+    }
+    charLength = HiveStringUtils.getTextUtfLength(value);
+    return charLength;
   }
 
   /**
@@ -45,6 +50,7 @@ public abstract class HiveBaseCharWritable {
 
   public void readFields(DataInput in) throws IOException {
     value.readFields(in);
+    charLength = -1;
   }
 
   public void write(DataOutput out) throws IOException {
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveCharWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveCharWritable.java
index 5cc10a8..ea3b8e5 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveCharWritable.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveCharWritable.java
@@ -53,6 +53,7 @@ public class HiveCharWritable extends HiveBaseCharWritable
 
   public void set(HiveCharWritable val) {
     value.set(val.value);
+    charLength = -1;
   }
 
   public void set(HiveCharWritable val, int maxLength) {
@@ -78,6 +79,9 @@ public class HiveCharWritable extends HiveBaseCharWritable
   }
 
   public Text getStrippedValue() {
+    if (value.charAt(value.getLength() - 1) != ' ') {
+      return value;
+    }
     // A lot of these methods could be done more efficiently by operating on the Text value
     // directly, rather than converting to HiveChar.
     return new Text(getHiveChar().getStrippedValue());
@@ -88,7 +92,11 @@ public class HiveCharWritable extends HiveBaseCharWritable
   }
 
   public int getCharacterLength() {
-    return HiveStringUtils.getTextUtfLength(getStrippedValue());
+    if (charLength != -1) {
+      return charLength;
+    }
+    charLength = HiveStringUtils.getTextUtfLength(getStrippedValue());
+    return charLength;
   }
 
   public int compareTo(HiveCharWritable rhs) {
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveVarcharWritable.java b/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveVarcharWritable.java
index 796c533..c3812d6 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveVarcharWritable.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/io/HiveVarcharWritable.java
@@ -45,6 +45,7 @@ public class HiveVarcharWritable extends HiveBaseCharWritable
 
   public void set(HiveVarcharWritable val) {
     value.set(val.value);
+    charLength = val.charLength;
   }
 
   public void set(HiveVarcharWritable val, int maxLength) {
@@ -57,6 +58,7 @@ public class HiveVarcharWritable extends HiveBaseCharWritable
 
   public void set(String val, int maxLength) {
     value.set(HiveBaseChar.enforceMaxLength(val, maxLength));
+    charLength = maxLength;
   }
 
   public HiveVarchar getHiveVarchar() {


[hive] 01/02: HIVE-23843: Improve key evictions in VectorGroupByOperator (Rajesh Balamohan via Ashutosh Chauhan, Zoltan Haindrich)

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit 380be9afd1a364fe0ff83e61e17ba4ced12f29a0
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Tue Jul 14 10:00:14 2020 +0530

    HIVE-23843: Improve key evictions in VectorGroupByOperator (Rajesh Balamohan via Ashutosh Chauhan, Zoltan Haindrich)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../ql/exec/vector/VectorAggregationBufferRow.java |  12 +-
 .../hive/ql/exec/vector/VectorGroupByOperator.java |  53 +++++++--
 .../ql/exec/vector/TestVectorGroupByOperator.java  | 125 ++++++++++++++++++---
 3 files changed, 168 insertions(+), 22 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
index 494db35..a7ef154 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAggregationBufferRow.java
@@ -28,7 +28,8 @@ public class VectorAggregationBufferRow {
   private VectorAggregateExpression.AggregationBuffer[] aggregationBuffers;
   private int version;
   private int index;
-  
+  private int accessed = 0;
+
   public VectorAggregationBufferRow(
       VectorAggregateExpression.AggregationBuffer[] aggregationBuffers) {
     this.aggregationBuffers = aggregationBuffers;
@@ -80,5 +81,12 @@ public class VectorAggregationBufferRow {
       aggregationBuffers[i].reset();
     }
   }
-  
+
+  public int getAccessCount() {
+    return accessed;
+  }
+
+  public void incrementAccessCount() {
+    accessed++;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 9f81e8e..85535f5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -151,6 +151,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
   private float memoryThreshold;
 
   private boolean isLlap = false;
+
+  // tracks overall access count in map agg buffer any given time.
+  private long totalAccessCount;
+
   /**
    * Interface for processing mode: global, hash, unsorted streaming, or group batch
    */
@@ -251,7 +255,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
    * This mode is very simple, there are no keys to consider, and only flushes one row at closing
    * The one row must flush even if no input was seen (NULLs)
    */
-  private class ProcessingModeGlobalAggregate extends ProcessingModeBase {
+  final class ProcessingModeGlobalAggregate extends ProcessingModeBase {
 
     /**
      * In global processing mode there is only one set of aggregation buffers
@@ -288,12 +292,13 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
   /**
    * Hash Aggregate mode processing
    */
-  private class ProcessingModeHashAggregate extends ProcessingModeBase {
+   final class ProcessingModeHashAggregate extends ProcessingModeBase {
 
     /**
      * The global key-aggregation hash map.
      */
-    private Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
+    @VisibleForTesting
+    Map<KeyWrapper, VectorAggregationBufferRow> mapKeysAggregationBuffers;
 
     /**
      * Total per hashtable entry fixed memory (does not depend on key/agg values).
@@ -334,7 +339,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
     /**
      * A soft reference used to detect memory pressure
      */
-    private SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
+    @VisibleForTesting
+    SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
 
     /**
      * Counts the number of time the gcCanary died and was resurrected
@@ -387,10 +393,19 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
       sumBatchSize = 0;
 
       mapKeysAggregationBuffers = new HashMap<KeyWrapper, VectorAggregationBufferRow>();
+      if (groupingSets != null && groupingSets.length > 0) {
+        this.maxHtEntries = this.maxHtEntries / groupingSets.length;
+        LOG.info("New maxHtEntries: {}, groupingSets len: {}", maxHtEntries, groupingSets.length);
+      }
       computeMemoryLimits();
       LOG.debug("using hash aggregation processing mode");
     }
 
+    @VisibleForTesting
+    int getMaxHtEntries() {
+      return maxHtEntries;
+    }
+
     @Override
     public void doProcessBatch(VectorizedRowBatch batch, boolean isFirstGroupingSet,
         boolean[] currentGroupingSetsOverrideIsNulls) throws HiveException {
@@ -502,6 +517,10 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
           mapKeysAggregationBuffers.put(kw.copyKey(), aggregationBuffer);
           numEntriesHashTable++;
           numEntriesSinceCheck++;
+        } else {
+          // for access tracking
+          aggregationBuffer.incrementAccessCount();
+          totalAccessCount++;
         }
         aggregationBatchInfo.mapAggregationBufferSet(aggregationBuffer, i);
       }
@@ -540,6 +559,16 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
       }
     }
 
+    int computeAvgAccess() {
+      if (numEntriesHashTable == 0) {
+        return 0;
+      }
+      int avgAccess = (int) (totalAccessCount / numEntriesHashTable);
+      LOG.debug("totalAccessCount:{}, numEntries:{}, avgAccess:{}",
+          totalAccessCount, numEntriesHashTable, avgAccess);
+      return avgAccess;
+    }
+
     /**
      * Flushes the entries in the hash table by emiting output (forward).
      * When parameter 'all' is true all the entries are flushed.
@@ -561,6 +590,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
             maxHashTblMemory/1024/1024,
             gcCanary.get() == null ? "dead" : "alive"));
       }
+      int avgAccess = computeAvgAccess();
 
       /* Iterate the global (keywrapper,aggregationbuffers) map and emit
        a row for each key */
@@ -568,10 +598,17 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
           mapKeysAggregationBuffers.entrySet().iterator();
       while(iter.hasNext()) {
         Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
+        if (!all && avgAccess >= 1) {
+          // Retain entries when access pattern is > than average access
+          if (pair.getValue().getAccessCount() > avgAccess) {
+            continue;
+          }
+        }
 
         writeSingleRow((VectorHashKeyWrapperBase) pair.getKey(), pair.getValue());
 
         if (!all) {
+          totalAccessCount -= pair.getValue().getAccessCount();
           iter.remove();
           --numEntriesHashTable;
           if (++entriesFlushed >= entriesToFlush) {
@@ -582,6 +619,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
 
       if (all) {
         mapKeysAggregationBuffers.clear();
+        totalAccessCount = 0;
         numEntriesHashTable = 0;
       }
 
@@ -674,7 +712,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
    * Streaming processing mode on ALREADY GROUPED data. Each input VectorizedRowBatch may
    * have a mix of different keys.  Intermediate values are flushed each time key changes.
    */
-  private class ProcessingModeStreaming extends ProcessingModeBase {
+  final class ProcessingModeStreaming extends ProcessingModeBase {
 
     /**
      * The aggregation buffers used in streaming mode
@@ -816,7 +854,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
    *      writeGroupRow does this and finally increments outputBatch.size.
    *
    */
-  private class ProcessingModeReduceMergePartial extends ProcessingModeBase {
+  final class ProcessingModeReduceMergePartial extends ProcessingModeBase {
 
     private boolean first;
     private boolean isLastGroupBatch;
@@ -896,7 +934,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc>
   /**
    * Current processing mode. Processing mode can change (eg. hash -> streaming).
    */
-  private transient IProcessingMode processingMode;
+  @VisibleForTesting
+  transient IProcessingMode processingMode;
 
   private static final long serialVersionUID = 1L;
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
index 3835987..c22a833 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorGroupByOperator.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -39,9 +38,9 @@ import java.util.Set;
 import org.apache.calcite.util.Pair;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapDaemonInfo;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.KeyWrapper;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
@@ -76,8 +75,6 @@ import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BooleanWritable;
@@ -499,6 +496,31 @@ public class TestVectorGroupByOperator {
       }
     });
 
+    // vrb of 1 row each
+    FakeVectorRowBatchFromObjectIterables data = getDataForRollup();
+
+    long countRowsProduced = 0;
+    for (VectorizedRowBatch unit: data) {
+      // after 24 rows, we'd have seen all the keys
+      // find 14 keys in the hashmap
+      // but 24*0.5 = 12
+      // won't turn off hash mode because of the 3 grouping sets
+      // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows
+      countRowsProduced += unit.size;
+      vgo.process(unit,  0);
+
+      if (countRowsProduced >= 100) {
+        break;
+      }
+
+    }
+    vgo.close(false);
+    // all groupings
+    // 10 keys generates 14 rows with the rollup
+    assertEquals(1+3+10, outputRowCount);
+  }
+
+  FakeVectorRowBatchFromObjectIterables getDataForRollup() throws HiveException {
     // k1 has nDV of 2
     Iterable<Object> k1 = new Iterable<Object>() {
       @Override
@@ -578,33 +600,110 @@ public class TestVectorGroupByOperator {
     };
 
     // vrb of 1 row each
-    FakeVectorRowBatchFromObjectIterables data = new FakeVectorRowBatchFromObjectIterables(
+    return new FakeVectorRowBatchFromObjectIterables(
         2,
         new String[] {"long", "long", "long", "long"},
         k1,
         k2,
         v,
         v); // output col
+  }
+
+  @Test
+  public void testRollupAggregationWithFlush() throws HiveException {
+
+    List<String> mapColumnNames = new ArrayList<String>();
+    mapColumnNames.add("k1");
+    mapColumnNames.add("k2");
+    mapColumnNames.add("v");
+    VectorizationContext ctx = new VectorizationContext("name", mapColumnNames);
+
+    // select count(v) from name group by rollup (k1,k2);
+
+    Pair<GroupByDesc,VectorGroupByDesc> pair = buildKeyGroupByDesc (ctx, "count",
+        "v", TypeInfoFactory.longTypeInfo,
+        new String[] { "k1", "k2" },
+        new TypeInfo[] {TypeInfoFactory.longTypeInfo, TypeInfoFactory.longTypeInfo});
+    GroupByDesc desc = pair.left;
+    VectorGroupByDesc vectorDesc = pair.right;
+
+    desc.setGroupingSetsPresent(true);
+    ArrayList<Long> groupingSets = new ArrayList<>();
+    // groupingSets
+    groupingSets.add(0L);
+    groupingSets.add(1L);
+    groupingSets.add(2L);
+    desc.setListGroupingSets(groupingSets);
+    // add grouping sets dummy key
+    ExprNodeDesc groupingSetDummyKey = new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, 0L);
+    // this only works because we used an arraylist in buildKeyGroupByDesc
+    // don't do this in actual compiler
+    desc.getKeys().add(groupingSetDummyKey);
+    // groupingSet Position
+    desc.setGroupingSetPosition(2);
+
+    CompilationOpContext cCtx = new CompilationOpContext();
+
+    desc.setMinReductionHashAggr(0.5f);
+    // Set really low check interval setting
+    hconf.set("hive.groupby.mapaggr.checkinterval", "10");
+    hconf.set("hive.vectorized.groupby.checkinterval", "10");
+
+    Operator<? extends OperatorDesc> groupByOp = OperatorFactory.get(cCtx, desc);
+
+    VectorGroupByOperator vgo =
+        (VectorGroupByOperator) Vectorizer.vectorizeGroupByOperator(groupByOp, ctx, vectorDesc);
+
+    FakeCaptureVectorToRowOutputOperator out = FakeCaptureVectorToRowOutputOperator.addCaptureOutputChild(cCtx, vgo);
+    vgo.initialize(hconf, null);
+
+    //Get the processing mode
+    VectorGroupByOperator.ProcessingModeHashAggregate processingMode =
+        (VectorGroupByOperator.ProcessingModeHashAggregate) vgo.processingMode;
+    assertEquals(333333,
+        ((VectorGroupByOperator.ProcessingModeHashAggregate)vgo.processingMode).getMaxHtEntries());
+
+    this.outputRowCount = 0;
+    out.setOutputInspector(new FakeCaptureVectorToRowOutputOperator.OutputInspector() {
+      @Override
+      public void inspectRow(Object row, int tag) throws HiveException {
+        ++outputRowCount;
+      }
+    });
+
+    FakeVectorRowBatchFromObjectIterables data = getDataForRollup();
 
     long countRowsProduced = 0;
+    long numElementsToBeRetained = 0;
+    int avgAccess = 0;
     for (VectorizedRowBatch unit: data) {
-      // after 24 rows, we'd have seen all the keys
-      // find 14 keys in the hashmap
-      // but 24*0.5 = 12
-      // won't turn off hash mode because of the 3 grouping sets
-      // if it turns off the hash mode, we'd get 14 + 3*(100-24) rows
       countRowsProduced += unit.size;
       vgo.process(unit,  0);
 
       if (countRowsProduced >= 100) {
+        // note down avg access
+        avgAccess = processingMode.computeAvgAccess();
+        numElementsToBeRetained = getElementsHigherThan(processingMode.mapKeysAggregationBuffers, avgAccess);
+        // trigger flush explicitly on next iteration
+        processingMode.gcCanary.clear();
         break;
       }
+    }
+
+    // This processing would trigger flush
+    for (VectorizedRowBatch unit: data) {
+      vgo.process(unit,  0);
+      long freqElementsAfterFlush = getElementsHigherThan(processingMode.mapKeysAggregationBuffers, avgAccess);
 
+      assertTrue("After flush: " + freqElementsAfterFlush + ", before flush: " + numElementsToBeRetained,
+          (freqElementsAfterFlush >= numElementsToBeRetained));
+      break;
     }
     vgo.close(false);
-    // all groupings
-    // 10 keys generates 14 rows with the rollup
-    assertEquals(1+3+10, outputRowCount);
+  }
+
+  long getElementsHigherThan(Map<KeyWrapper, VectorAggregationBufferRow> aggMap, int avgAccess) {
+    return aggMap.values().stream().filter(v -> (v.getAccessCount() > avgAccess)).count();
   }
 
   @Test