You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2020/08/11 08:39:03 UTC
[hive] branch master updated: HIVE-24001: Don't cache MapWork in
tez/ObjectCache during query-based compaction (Karen Coppage,
reviewed by Marta Kuczora)
This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 71c9af7 HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora)
71c9af7 is described below
commit 71c9af7b8ead470520a6c3a4848be9c67eb80f10
Author: Karen Coppage <ka...@gmail.com>
AuthorDate: Tue Aug 11 10:38:46 2020 +0200
HIVE-24001: Don't cache MapWork in tez/ObjectCache during query-based compaction (Karen Coppage, reviewed by Marta Kuczora)
Closes #1368
---
.../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java | 13 ++++++++++++-
.../apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java | 2 ++
.../hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java | 2 +-
.../hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java | 2 +-
4 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 8c9d53f..5cfa759 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -93,6 +94,8 @@ public class MapRecordProcessor extends RecordProcessor {
private final List<String> cacheKeys = new ArrayList<>();
private final List<String> dynamicValueCacheKeys = new ArrayList<>();
private final ObjectCache cache, dynamicValueCache;
+ // is this part of the query-based compaction process
+ private final boolean isInCompaction;
public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
@@ -104,6 +107,8 @@ public class MapRecordProcessor extends RecordProcessor {
dynamicValueCache = ObjectCacheFactory.getCache(jconf, queryId, false, true);
execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
+ isInCompaction = CompactorUtil.COMPACTOR.equalsIgnoreCase(
+ HiveConf.getVar(jconf, HiveConf.ConfVars.SPLIT_GROUPING_MODE));
}
private void setLlapOfFragmentId(final ProcessorContext context) {
@@ -126,7 +131,13 @@ public class MapRecordProcessor extends RecordProcessor {
// create map and fetch operators
- mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf));
+ if (!isInCompaction) {
+ mapWork = cache.retrieve(key, () -> Utilities.getMapWork(jconf));
+ } else {
+ // During query-based compaction, we don't want to retrieve old MapWork from the cache, we want a new mapper
+ // and new UDF validate_acid_sort_order instance for each bucket, otherwise validate_acid_sort_order will fail.
+ mapWork = Utilities.getMapWork(jconf);
+ }
// TODO HIVE-14042. Cleanup may be required if exiting early.
Utilities.setMapWork(jconf, mapWork);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
index 85c1bf6..28fc642 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorUtil.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class CompactorUtil {
+ public static final String COMPACTOR = "compactor";
+
public interface ThrowingRunnable<E extends Exception> {
void run() throws E;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 3df8ad7..ec81bfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -49,7 +49,7 @@ final class MajorQueryCompactor extends QueryCompactor {
* For now, we will group splits on tez so that we end up with all bucket files,
* with same bucket number in one map task.
*/
- conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+ conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, CompactorUtil.COMPACTOR);
String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_";
String tmpTableName = tmpPrefix + System.currentTimeMillis();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 810f150..79e5595 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -53,7 +53,7 @@ final class MinorQueryCompactor extends QueryCompactor {
// Set up the session for driver.
HiveConf conf = new HiveConf(hiveConf);
conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
- conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+ conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, CompactorUtil.COMPACTOR);
conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false);
conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false);
String tmpTableName =