You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2020/08/11 08:39:03 UTC

[hive] branch master updated: HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora)

This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 71c9af7  HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora)
71c9af7 is described below

commit 71c9af7b8ead470520a6c3a4848be9c67eb80f10
Author: Karen Coppage <ka...@gmail.com>
AuthorDate: Tue Aug 11 10:38:46 2020 +0200

    HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora)
    
    Closes #1368
---
 .../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java  | 13 ++++++++++++-
 .../apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java  |  2 ++
 .../hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java   |  2 +-
 .../hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java   |  2 +-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 8c9d53f..5cfa759 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -93,6 +94,8 @@ public class MapRecordProcessor extends RecordProcessor {
   private final List<String> cacheKeys = new ArrayList<>();
   private final List<String> dynamicValueCacheKeys = new ArrayList<>();
   private final ObjectCache cache, dynamicValueCache;
+  // is this part of the query-based compaction process
+  private final boolean isInCompaction;
 
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
@@ -104,6 +107,8 @@ public class MapRecordProcessor extends RecordProcessor {
     dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false, true);
     execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
+    isInCompaction = CompactorUtil.COMPACTOR.equalsIgnoreCase(
+        HiveConf.getVar(jconf, HiveConf.ConfVars.SPLIT_GROUPING_MODE));
   }
 
   private void setLlapOfFragmentId(final ProcessorContext context) {
@@ -126,7 +131,13 @@ public class MapRecordProcessor extends RecordProcessor {
 
 
     // create map and fetch operators
-    mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf));
+    if (!isInCompaction) {
+      mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf));
+    } else {
+      // During query-based compaction, we don't want to retrieve old MapWork from the cache, we want a new mapper
+      // and new UDF validate_acid_sort_order instance for each bucket, otherwise validate_acid_sort_order will fail.
+      mapWork = Utilities.getMapWork(jconf);
+    }
     // TODO HIVE-14042. Cleanup may be required if exiting early.
     Utilities.setMapWork(jconf, mapWork);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index 85c1bf6..28fc642 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 public class CompactorUtil {
+  public static final String COMPACTOR = "compactor";
+
   public interface ThrowingRunnable<E extends Exception> {
     void run() throws E;
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 3df8ad7..ec81bfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -49,7 +49,7 @@ final class MajorQueryCompactor extends QueryCompactor {
      * For now, we will group splits on tez so that we end up with all bucket files,
      * with same bucket number in one map task.
      */
-    conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+    conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, CompactorUtil.COMPACTOR);
 
     String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_";
     String tmpTableName = tmpPrefix + System.currentTimeMillis();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 810f150..79e5595 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -53,7 +53,7 @@ final class MinorQueryCompactor extends QueryCompactor {
     // Set up the session for driver.
     HiveConf conf = new HiveConf(hiveConf);
     conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
-    conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+    conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, CompactorUtil.COMPACTOR);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false);
     String tmpTableName =