You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/07/25 16:05:32 UTC

[hive] branch master updated: HIVE-21828: Tez: Use a pre-parsed TezConfiguration from DagUtils (Attila Magyar, reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ef21f4  HIVE-21828: Tez: Use a pre-parsed TezConfiguration from DagUtils (Attila Magyar, reviewed by Jesus Camacho Rodriguez)
9ef21f4 is described below

commit 9ef21f447a34535d00103a5affb74f940d2d5ecf
Author: Attila Magyar <am...@hortonworks.com>
AuthorDate: Thu Jul 25 09:05:01 2019 -0700

    HIVE-21828: Tez: Use a pre-parsed TezConfiguration from DagUtils (Attila Magyar, reviewed by Jesus Camacho Rodriguez)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  6 +--
 .../apache/hadoop/hive/ql/exec/tez/DagUtils.java   | 22 +++++++-
 .../hive/ql/exec/tez/TezConfigurationFactory.java  | 63 ++++++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java    | 10 ++--
 .../hive/ql/udf/generic/GenericUDFRegExp.java      |  2 +-
 .../hadoop/hive/ql/exec/tez/TestTezTask.java       |  3 +-
 .../apache/hive/testutils/HiveTestEnvSetup.java    | 18 +++++++
 ql/src/test/queries/clientpositive/mm_loaddata.q   |  1 +
 .../llap/tez_fixed_bucket_pruning.q.out            |  8 +--
 9 files changed, 118 insertions(+), 15 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 2b7468a..a9fcf37 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5432,7 +5432,7 @@ public class HiveConf extends Configuration {
 
   public static void setVar(Configuration conf, ConfVars var, String val) {
     assert (var.valClass == String.class) : var.varname;
-    conf.set(var.varname, val);
+    conf.set(var.varname, val, "setVar");
   }
   public static void setVar(Configuration conf, ConfVars var, String val,
     EncoderDecoder<String, String> encoderDecoder) {
@@ -5541,7 +5541,7 @@ public class HiveConf extends Configuration {
     origProp = getAllProperties();
 
     // Overlay the ConfVars. Note that this ignores ConfVars with null values
-    addResource(getConfVarInputStream());
+    addResource(getConfVarInputStream(), "HiveConf.java");
 
     // Overlay hive-site.xml if it exists
     if (hiveSiteURL != null) {
@@ -5654,8 +5654,8 @@ public class HiveConf extends Configuration {
     if (whiteListParamsStr == null || whiteListParamsStr.trim().isEmpty()) {
       // set the default configs in whitelist
       whiteListParamsStr = getSQLStdAuthDefaultWhiteListPattern();
+      setVar(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST, whiteListParamsStr);
     }
-    setVar(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST, whiteListParamsStr);
   }
 
   private static String getSQLStdAuthDefaultWhiteListPattern() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 3278dfe..6055967 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.ZipOutputStream;
@@ -1344,9 +1345,27 @@ public class DagUtils {
    * @throws IOException
    */
   public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
+    return createConfiguration(hiveConf, false);
+  }
+
+  /**
+   * Creates and initializes a JobConf object that can be used to execute
+   * the DAG. This can skip the configs which are already included in AM configs.
+   * @param hiveConf Current conf for the execution
+   * @param skipAMConf Skip the configs where are already set across all DAGs 
+   * @return JobConf base configuration for job execution
+   * @throws IOException
+   */
+  public JobConf createConfiguration(HiveConf hiveConf, boolean skipAMConf) throws IOException {
     hiveConf.setBoolean("mapred.mapper.new-api", false);
 
-    JobConf conf = new JobConf(new TezConfiguration(hiveConf));
+    Predicate<String> findDefaults =
+        (s) -> ((s != null) && (s.endsWith(".xml") || (s.endsWith(".java") && !"HiveConf.java".equals(s))));
+
+    // since this is an inclusion filter, negate the predicate
+    JobConf conf =
+        TezConfigurationFactory
+            .wrapWithJobConf(hiveConf, skipAMConf ? findDefaults.negate() : null);
 
     conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
 
@@ -1364,6 +1383,7 @@ public class DagUtils {
     // Removing job credential entry/ cannot be set on the tasks
     conf.unset("mapreduce.job.credentials.binary");
 
+    // TODO: convert this to a predicate too
     hiveConf.stripHiddenConfigurations(conf);
     return conf;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java
new file mode 100644
index 0000000..f33debe
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezConfigurationFactory.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+public class TezConfigurationFactory {
+
+  public static Configuration copyInto(Configuration target, Configuration src,
+      Predicate<String> sourceFilter) {
+    Iterator<Map.Entry<String, String>> iter = src.iterator();
+    while (iter.hasNext()) {
+      Map.Entry<String, String> entry = iter.next();
+      String name = entry.getKey();
+      String value = entry.getValue();
+      String[] sources = src.getPropertySources(name);
+      final String source;
+      if (sources == null || sources.length == 0) {
+        source = null;
+      } else {
+        /*
+         * If the property or its source wasn't found. Otherwise, returns a list of the sources of
+         * the resource. The older sources are the first ones in the list.
+         */
+        source = sources[sources.length - 1];
+      }
+
+      if (sourceFilter == null || sourceFilter.test(source)) {
+        target.set(name, value);
+      } else {
+      }
+    }
+    return target;
+  }
+
+  public static JobConf wrapWithJobConf(Configuration conf, Predicate<String> sourceFilter) {
+    JobConf jc = new JobConf(false);
+    copyInto(jc, conf, sourceFilter);
+    return jc;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index dd7ccd4..b32b6b3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -174,8 +174,10 @@ public class TezTask extends Task<TezWork> {
           ss.getHiveVariables().get("wmpool"), ss.getHiveVariables().get("wmapp"));
 
       WmContext wmContext = ctx.getWmContext();
-      // jobConf will hold all the configuration for hadoop, tez, and hive
-      JobConf jobConf = utils.createConfiguration(conf);
+      // jobConf will hold all the configuration for hadoop, tez, and hive, which are not set in AM defaults
+      JobConf jobConf = utils.createConfiguration(conf, true);
+
+
       // Get all user jars from work (e.g. input format stuff).
       String[] allNonConfFiles = work.configureJobConfAndExtractJars(jobConf);
       // DAG scratch dir. We get a session from the pool so it may be different from Tez one.
@@ -217,7 +219,7 @@ public class TezTask extends Task<TezWork> {
         if (this.isShutdown) {
           throw new HiveException("Operation cancelled");
         }
-        DAGClient dagClient = submit(jobConf, dag, sessionRef);
+        DAGClient dagClient = submit(dag, sessionRef);
         session = sessionRef.value;
         boolean wasShutdown = false;
         synchronized (dagClientLock) {
@@ -532,7 +534,7 @@ public class TezTask extends Task<TezWork> {
     return newSession;
   }
 
-  DAGClient submit(JobConf conf, DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception {
+  DAGClient submit(DAG dag, Ref<TezSessionState> sessionStateRef) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
     DAGClient dagClient = null;
     TezSessionState sessionState = sessionStateRef.value;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFRegExp.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFRegExp.java
index 3bf3cfd..8522abd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFRegExp.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFRegExp.java
@@ -64,7 +64,7 @@ public class GenericUDFRegExp extends GenericUDF {
   @Override
   public void configure(MapredContext context) {
     if (context != null) {
-      if(context.getJobConf().get("hive.use.googleregex.engine").equals("true")){
+      if(HiveConf.getBoolVar(context.getJobConf(), HiveConf.ConfVars.HIVEUSEGOOGLEREGEXENGINE)){
         this.useGoogleRegexEngine=true;
       }
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index befeb4f..4e4a979 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -28,7 +28,6 @@ import org.apache.hive.common.util.Ref;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -220,7 +219,7 @@ public class TestTezTask {
   @Test
   public void testSubmit() throws Exception {
     DAG dag = DAG.create("test");
-    task.submit(conf, dag, Ref.from(sessionState));
+    task.submit(dag, Ref.from(sessionState));
     // validate close/reopen
     verify(sessionState, times(1)).reopen();
     verify(session, times(2)).submitDAG(any(DAG.class));
diff --git a/ql/src/test/org/apache/hive/testutils/HiveTestEnvSetup.java b/ql/src/test/org/apache/hive/testutils/HiveTestEnvSetup.java
index f872da0..efab408 100644
--- a/ql/src/test/org/apache/hive/testutils/HiveTestEnvSetup.java
+++ b/ql/src/test/org/apache/hive/testutils/HiveTestEnvSetup.java
@@ -64,6 +64,11 @@ import com.google.common.collect.Sets;
  */
 public class HiveTestEnvSetup extends ExternalResource {
 
+  private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
+    File.separator + HiveTestEnvSetup.class.getCanonicalName()
+    + "-" + System.currentTimeMillis()
+  ).getPath().replaceAll("\\\\", "/");
+
   static interface IHiveTestRule {
     default void beforeClass(HiveTestEnvContext ctx) throws Exception {
     }
@@ -222,12 +227,25 @@ public class HiveTestEnvSetup extends ExternalResource {
       HadoopShims shims = ShimLoader.getHadoopShims();
       mr1 = shims.getLocalMiniTezCluster(ctx.hiveConf, true);
       mr1.setupConfiguration(ctx.hiveConf);
+      setupTez(ctx.hiveConf);
     }
 
     @Override
     public void afterClass(HiveTestEnvContext ctx) throws Exception {
       mr1.shutdown();
     }
+
+    private void setupTez(HiveConf conf) {
+      conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+      conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+      conf.set("tez.am.resource.memory.mb", "128");
+      conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
+      conf.setBoolean("tez.local.mode", true);
+      conf.set("fs.defaultFS", "file:///");
+      conf.setBoolean("tez.runtime.optimize.local.fetch", true);
+      conf.set("tez.staging-dir", TEST_DATA_DIR);
+      conf.setBoolean("tez.ignore.lib.uris", true);
+    }
   }
 
   public static final String HIVE_ROOT = getHiveRoot();
diff --git a/ql/src/test/queries/clientpositive/mm_loaddata.q b/ql/src/test/queries/clientpositive/mm_loaddata.q
index 7e5787f..3a65213 100644
--- a/ql/src/test/queries/clientpositive/mm_loaddata.q
+++ b/ql/src/test/queries/clientpositive/mm_loaddata.q
@@ -6,6 +6,7 @@ set tez.grouping.max-size=2;
 set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set mapreduce.map.memory.mb=128;
 
 
 drop table load0_mm;
diff --git a/ql/src/test/results/clientpositive/llap/tez_fixed_bucket_pruning.q.out b/ql/src/test/results/clientpositive/llap/tez_fixed_bucket_pruning.q.out
index eaed60c..eecee34 100644
--- a/ql/src/test/results/clientpositive/llap/tez_fixed_bucket_pruning.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_fixed_bucket_pruning.q.out
@@ -738,7 +738,7 @@ STAGE PLANS:
                     serialization.ddl struct l3_monthly_dw_dimplan { i64 idp_warehouse_id, i64 idp_audit_id, date idp_data_date, i64 l3_snapshot_number, i64 plan_key, i64 project_key, i64 charge_code_key, i64 transclass_key, i64 resource_key, i64 finplan_detail_object_id, i64 project_object_id, i64 txn_class_object_id, i64 charge_code_object_id, i64 resoruce_object_id, varchar(1500) plan_name, varchar(500) plan_code, varchar(50) plan_type, varchar(50) period_type, varchar(3000) plan_desc [...]
                     serialization.format 1
                     serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                    totalSize 5242697
+                    totalSize 5233146
 #### A masked pattern was here ####
                   serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                 
@@ -761,7 +761,7 @@ STAGE PLANS:
                       serialization.ddl struct l3_monthly_dw_dimplan { i64 idp_warehouse_id, i64 idp_audit_id, date idp_data_date, i64 l3_snapshot_number, i64 plan_key, i64 project_key, i64 charge_code_key, i64 transclass_key, i64 resource_key, i64 finplan_detail_object_id, i64 project_object_id, i64 txn_class_object_id, i64 charge_code_object_id, i64 resoruce_object_id, varchar(1500) plan_name, varchar(500) plan_code, varchar(50) plan_type, varchar(50) period_type, varchar(3000) plan_de [...]
                       serialization.format 1
                       serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                      totalSize 5242697
+                      totalSize 5233146
 #### A masked pattern was here ####
                     serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                     name: default.l3_monthly_dw_dimplan
@@ -1247,7 +1247,7 @@ STAGE PLANS:
                     serialization.ddl struct l3_monthly_dw_dimplan { i64 idp_warehouse_id, i64 idp_audit_id, date idp_data_date, i64 l3_snapshot_number, i64 plan_key, i64 project_key, i64 charge_code_key, i64 transclass_key, i64 resource_key, i64 finplan_detail_object_id, i64 project_object_id, i64 txn_class_object_id, i64 charge_code_object_id, i64 resoruce_object_id, varchar(1500) plan_name, varchar(500) plan_code, varchar(50) plan_type, varchar(50) period_type, varchar(3000) plan_desc [...]
                     serialization.format 1
                     serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                    totalSize 5242697
+                    totalSize 5233146
 #### A masked pattern was here ####
                   serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                 
@@ -1270,7 +1270,7 @@ STAGE PLANS:
                       serialization.ddl struct l3_monthly_dw_dimplan { i64 idp_warehouse_id, i64 idp_audit_id, date idp_data_date, i64 l3_snapshot_number, i64 plan_key, i64 project_key, i64 charge_code_key, i64 transclass_key, i64 resource_key, i64 finplan_detail_object_id, i64 project_object_id, i64 txn_class_object_id, i64 charge_code_object_id, i64 resoruce_object_id, varchar(1500) plan_name, varchar(500) plan_code, varchar(50) plan_type, varchar(50) period_type, varchar(3000) plan_de [...]
                       serialization.format 1
                       serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
-                      totalSize 5242697
+                      totalSize 5233146
 #### A masked pattern was here ####
                     serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
                     name: default.l3_monthly_dw_dimplan