You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/12 20:52:46 UTC

[1/2] hive git commit: HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/branch-2.1 b8903b36b -> bdf4ef890
  refs/heads/master 4b3507652 -> d97e4e2c9


HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d97e4e2c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d97e4e2c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d97e4e2c

Branch: refs/heads/master
Commit: d97e4e2c9bdd292f433173e9cce7445e9916e64d
Parents: 4b35076
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Aug 12 13:33:39 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Aug 12 13:33:46 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java | 2 +-
 .../org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java   | 5 +++--
 .../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java   | 5 +----
 .../hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java    | 5 ++---
 .../hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java       | 8 +-------
 5 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 99cdaa0..416606e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -145,7 +145,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
     // the task in. On MR: The cache is a no-op.
     String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
     cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
-    cache = ObjectCacheFactory.getCache(hconf, queryId);
+    cache = ObjectCacheFactory.getCache(hconf, queryId, false);
     loader = getHashTableLoader(hconf);
 
     hashMapRowGetters = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 5201120..5a19030 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -44,10 +44,11 @@ public class ObjectCacheFactory {
   /**
    * Returns the appropriate cache
    */
-  public static ObjectCache getCache(Configuration conf, String queryId) {
+  public static ObjectCache getCache(Configuration conf, String queryId, boolean isPlanCache) {
     if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       if (LlapProxy.isDaemon()) { // daemon
-        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)
+            && !isPlanCache) {
           // LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
           return getLlapObjectCache(queryId);
         } else { // no cache

http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 0886c0e..6f36dfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -96,11 +95,9 @@ public class MapRecordProcessor extends RecordProcessor {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     if (LlapProxy.isDaemon()) {
-      cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); // do not cache plan
       setLlapOfFragmentId(context);
-    } else {
-      cache = ObjectCacheFactory.getCache(jconf, queryId);
     }
+    cache = ObjectCacheFactory.getCache(jconf, queryId, true);
     execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
     cacheKeys = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index ec97856..b7f1011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -63,7 +63,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
   private MergeFileWork mfWork;
   MRInputLegacy mrInput = null;
   private final Object[] row = new Object[2];
-  ObjectCache cache;
+  org.apache.hadoop.hive.ql.exec.ObjectCache cache;
 
   public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) {
     super(jconf, context);
@@ -95,8 +95,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
     }
 
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
-      .getCache(jconf, queryId);
+    cache = ObjectCacheFactory.getCache(jconf, queryId, true);
 
     try {
       execContext.setJc(jconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/d97e4e2c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 1390a00..cf3c8ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -89,14 +89,8 @@ public class ReduceRecordProcessor  extends RecordProcessor{
   public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
 
-    ObjectCache cache;
-
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapProxy.isDaemon()) { // don't cache plan
-      cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
-    } else {
-      cache = ObjectCacheFactory.getCache(jconf, queryId);
-    }
+    cache = ObjectCacheFactory.getCache(jconf, queryId, true);
 
     String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
     cacheKeys = Lists.newArrayList(cacheKey);


[2/2] hive git commit: HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by se...@apache.org.
HIVE-14433 : refactor LLAP plan cache avoidance and fix issue in merge processor (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Conflicts:
	ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bdf4ef89
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bdf4ef89
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bdf4ef89

Branch: refs/heads/branch-2.1
Commit: bdf4ef8900ebf67fcb984469bb997c4f783bce89
Parents: b8903b3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Aug 12 13:33:39 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Aug 12 13:43:52 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java | 2 +-
 .../org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java   | 5 +++--
 .../apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java   | 7 ++-----
 .../hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java    | 5 ++---
 .../hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java       | 8 +-------
 5 files changed, 9 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 99cdaa0..416606e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -145,7 +145,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
     // the task in. On MR: The cache is a no-op.
     String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID);
     cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container";
-    cache = ObjectCacheFactory.getCache(hconf, queryId);
+    cache = ObjectCacheFactory.getCache(hconf, queryId, false);
     loader = getHashTableLoader(hconf);
 
     hashMapRowGetters = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
index 5201120..5a19030 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
@@ -44,10 +44,11 @@ public class ObjectCacheFactory {
   /**
    * Returns the appropriate cache
    */
-  public static ObjectCache getCache(Configuration conf, String queryId) {
+  public static ObjectCache getCache(Configuration conf, String queryId, boolean isPlanCache) {
     if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       if (LlapProxy.isDaemon()) { // daemon
-        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)) {
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OBJECT_CACHE_ENABLED)
+            && !isPlanCache) {
           // LLAP object cache, unlike others, does not use globals. Thus, get the existing one.
           return getLlapObjectCache(queryId);
         } else { // no cache

http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 1e92f0a..7c8ea9b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -94,14 +93,12 @@ public class MapRecordProcessor extends RecordProcessor {
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapProxy.isDaemon()) { // do not cache plan
+    if (LlapProxy.isDaemon()) {
       String id = queryId + "_" + context.getTaskIndex();
       l4j.info("LLAP_OF_ID: "+id);
       jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
-      cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
-    } else {
-      cache = ObjectCacheFactory.getCache(jconf, queryId);
     }
+    cache = ObjectCacheFactory.getCache(jconf, queryId, true);
     execContext = new ExecMapperContext(jconf);
     execContext.setJc(jconf);
     cacheKeys = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
index ec97856..b7f1011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
@@ -63,7 +63,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
   private MergeFileWork mfWork;
   MRInputLegacy mrInput = null;
   private final Object[] row = new Object[2];
-  ObjectCache cache;
+  org.apache.hadoop.hive.ql.exec.ObjectCache cache;
 
   public MergeFileRecordProcessor(final JobConf jconf, final ProcessorContext context) {
     super(jconf, context);
@@ -95,8 +95,7 @@ public class MergeFileRecordProcessor extends RecordProcessor {
     }
 
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    org.apache.hadoop.hive.ql.exec.ObjectCache cache = ObjectCacheFactory
-      .getCache(jconf, queryId);
+    cache = ObjectCacheFactory.getCache(jconf, queryId, true);
 
     try {
       execContext.setJc(jconf);

http://git-wip-us.apache.org/repos/asf/hive/blob/bdf4ef89/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 1390a00..cf3c8ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@ -89,14 +89,8 @@ public class ReduceRecordProcessor  extends RecordProcessor{
   public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception {
     super(jconf, context);
 
-    ObjectCache cache;
-
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapProxy.isDaemon()) { // don't cache plan
-      cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
-    } else {
-      cache = ObjectCacheFactory.getCache(jconf, queryId);
-    }
+    cache = ObjectCacheFactory.getCache(jconf, queryId, true);
 
     String cacheKey = processorContext.getTaskVertexName() + REDUCE_PLAN_KEY;
     cacheKeys = Lists.newArrayList(cacheKey);