You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/12 20:52:46 UTC
[1/2] hive git commit: HIVE-14433 : refactor LLAP plan cache
avoidance and fix issue in merge processor (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/branch-2.1 b8903b36b -> bdf4ef890
refs/heads/master 4b3507652 -> d97e4e2c9
HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d97e4e2c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d97e4e2c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d97e4e2c
Branch: refs/heads/master
Commit: d97e4e2c9bdd292f433173e9cce7445e9916e64d
Parents: 4b35076
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Aug 12 13:33:39 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Aug 12 13:33:46 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java | 2 +-
.../org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java | 5 +++--
.../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java | 5 +----
.../hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java | 5 ++---
.../hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java | 8 +-------
5 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 99cdaa0..416606e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -145,7 +145,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
// the task in. On MR: The cache is a no-op.
String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
- cache = ObjectCacheFactory.getCache(hconf, queryId);
+ cache = ObjectCacheFactory.getCache(hconf, queryId, false);
loader = getHashTableLoader(hconf);
hashMapRowGetters = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 5201120..5a19030 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -44,10 +44,11 @@ public class ObjectCacheFactory {
/**
* Returns the appropriate cache
*/
- public static ObjectCache getCache(Configuration conf, String queryId) {
+ public static ObjectCache getCache(Configuration conf, String queryId, boolean isPlanCache) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
if (LlapProxy.isDaemon()) { // daemon
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)
+ && !isPlanCache) {
// LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
return getLlapObjectCache(queryId);
} else { // no cache
http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 0886c0e..6f36dfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -96,11 +95,9 @@ public class MapRecordProcessor extends RecordProcessor {
super(jconf, context);
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
if (LlapProxy.isDaemon()) {
- cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); // do not cache plan
setLlapOfFragmentId(context);
- } else {
- cache = ObjectCacheFactory.getCache(jconf, queryId);
}
+ cache = ObjectCacheFactory.getCache(jconf, queryId, true);
execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
cacheKeys = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index ec97856..b7f1011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -63,7 +63,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
private MergeFileWork mfWork;
MRInputLegacy mrInput = null;
private final Object[] row = new Object[2];
- ObjectCache cache;
+ org.apache.hadoop.hive.ql.exec.ObjectCache cache;
public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) {
super(jconf, context);
@@ -95,8 +95,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
}
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
- .getCache(jconf, queryId);
+ cache = ObjectCacheFactory.getCache(jconf, queryId, true);
try {
execContext.setJc(jconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 1390a00..cf3c8ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -89,14 +89,8 @@ public class ReduceRecordProcessor extends RecordProcessor{
public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
- ObjectCache cache;
-
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- if (LlapProxy.isDaemon()) { // don't cache plan
- cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
- } else {
- cache = ObjectCacheFactory.getCache(jconf, queryId);
- }
+ cache = ObjectCacheFactory.getCache(jconf, queryId, true);
String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
cacheKeys = Lists.newArrayList(cacheKey);
[2/2] hive git commit: HIVE-14433 : refactor LLAP plan cache
avoidance and fix issue in merge processor (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdf4ef89
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdf4ef89
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdf4ef89
Branch: refs/heads/branch-2.1
Commit: bdf4ef8900ebf67fcb984469bb997c4f783bce89
Parents: b8903b3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Aug 12 13:33:39 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Aug 12 13:43:52 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java | 2 +-
.../org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java | 5 +++--
.../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java | 7 ++-----
.../hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java | 5 ++---
.../hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java | 8 +-------
5 files changed, 9 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 99cdaa0..416606e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -145,7 +145,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
// the task in. On MR: The cache is a no-op.
String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
- cache = ObjectCacheFactory.getCache(hconf, queryId);
+ cache = ObjectCacheFactory.getCache(hconf, queryId, false);
loader = getHashTableLoader(hconf);
hashMapRowGetters = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 5201120..5a19030 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -44,10 +44,11 @@ public class ObjectCacheFactory {
/**
* Returns the appropriate cache
*/
- public static ObjectCache getCache(Configuration conf, String queryId) {
+ public static ObjectCache getCache(Configuration conf, String queryId, boolean isPlanCache) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
if (LlapProxy.isDaemon()) { // daemon
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)
+ && !isPlanCache) {
// LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
return getLlapObjectCache(queryId);
} else { // no cache
http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 1e92f0a..7c8ea9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -94,14 +93,12 @@ public class MapRecordProcessor extends RecordProcessor {
public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- if (LlapProxy.isDaemon()) { // do not cache plan
+ if (LlapProxy.isDaemon()) {
String id = queryId + "_" + context.getTaskIndex();
l4j.info("LLAP_OF_ID: "+id);
jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
- cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
- } else {
- cache = ObjectCacheFactory.getCache(jconf, queryId);
}
+ cache = ObjectCacheFactory.getCache(jconf, queryId, true);
execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
cacheKeys = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index ec97856..b7f1011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -63,7 +63,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
private MergeFileWork mfWork;
MRInputLegacy mrInput = null;
private final Object[] row = new Object[2];
- ObjectCache cache;
+ org.apache.hadoop.hive.ql.exec.ObjectCache cache;
public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) {
super(jconf, context);
@@ -95,8 +95,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
}
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
- .getCache(jconf, queryId);
+ cache = ObjectCacheFactory.getCache(jconf, queryId, true);
try {
execContext.setJc(jconf);
http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 1390a00..cf3c8ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -89,14 +89,8 @@ public class ReduceRecordProcessor extends RecordProcessor{
public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
super(jconf, context);
- ObjectCache cache;
-
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
- if (LlapProxy.isDaemon()) { // don't cache plan
- cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
- } else {
- cache = ObjectCacheFactory.getCache(jconf, queryId);
- }
+ cache = ObjectCacheFactory.getCache(jconf, queryId, true);
String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
cacheKeys = Lists.newArrayList(cacheKey);