You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/11/20 02:10:26 UTC

hive git commit: HIVE-17964: HoS: some spark configs doesn't require re-creating a session (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master f1698b62c -> 966d2b303


HIVE-17964: HoS: some spark configs doesn't require re-creating a session (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/966d2b30
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/966d2b30
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/966d2b30

Branch: refs/heads/master
Commit: 966d2b303b473e46a2877c13a58497b98c0896d5
Parents: f1698b6
Author: Rui Li <li...@apache.org>
Authored: Mon Nov 20 10:10:21 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Nov 20 10:10:21 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 22 ++++++++++-
 data/scripts/sleep.py                           | 23 ++++++++++++
 .../clientnegative/spark_job_max_tasks.q        |  8 +++-
 .../clientnegative/spark_stage_max_tasks.q      |  8 +++-
 .../spark/spark_job_max_tasks.q.out             | 39 ++++++++++++--------
 .../spark/spark_stage_max_tasks.q.out           | 39 ++++++++++++--------
 6 files changed, 102 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1a1d50c..0cc8de0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -91,6 +91,7 @@ public class HiveConf extends Configuration {
   private static final Map<String, ConfVars> metaConfs = new HashMap<String, ConfVars>();
   private final List<String> restrictList = new ArrayList<String>();
   private final Set<String> hiddenSet = new HashSet<String>();
+  private final List<String> rscList = new ArrayList<>();
 
   private Pattern modWhiteListPattern = null;
   private volatile boolean isSparkConfigUpdated = false;
@@ -3580,6 +3581,7 @@ public class HiveConf extends Configuration {
             "hive.spark.client.secret.bits," +
             "hive.spark.client.rpc.server.address," +
             "hive.spark.client.rpc.server.port," +
+            "hive.spark.client.rpc.sasl.mechanisms," +
             "bonecp.,"+
             "hive.druid.broker.address.default,"+
             "hive.druid.coordinator.address.default,"+
@@ -3600,6 +3602,12 @@ public class HiveConf extends Configuration {
         "hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
         "Comma separated list of variables which are used internally and should not be configurable."),
 
+    HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list",
+        SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," +
+            SPARK_CLIENT_FUTURE_TIMEOUT.varname,
+        "Comma separated list of variables which are related to remote spark context.\n" +
+            "Changing these variables will result in re-creating the spark session."),
+
     HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
         new TimeValidator(TimeUnit.SECONDS),
         "Timeout for Running Query in seconds. A nonpositive value means infinite. " +
@@ -3927,7 +3935,7 @@ public class HiveConf extends Configuration {
       if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
         result = true;
       }
-    } else if (name.startsWith("hive.spark")) { // Remote Spark Context property.
+    } else if (rscList.stream().anyMatch(rscVar -> rscVar.equals(name))) { // Remote Spark Context property.
       result = true;
     } else if (name.equals("mapreduce.job.queuename")) {
       // a special property starting with mapreduce that we would also like to effect if it changes
@@ -4409,6 +4417,7 @@ public class HiveConf extends Configuration {
     setupRestrictList();
     hiddenSet.clear();
     hiddenSet.addAll(HiveConfUtil.getHiddenSet(this));
+    setupRSCList();
   }
 
   /**
@@ -4799,6 +4808,17 @@ public class HiveConf extends Configuration {
     restrictList.add(ConfVars.HIVE_CONF_RESTRICTED_LIST.varname);
     restrictList.add(ConfVars.HIVE_CONF_HIDDEN_LIST.varname);
     restrictList.add(ConfVars.HIVE_CONF_INTERNAL_VARIABLE_LIST.varname);
+    restrictList.add(ConfVars.HIVE_SPARK_RSC_CONF_LIST.varname);
+  }
+
+  private void setupRSCList() {
+    rscList.clear();
+    String vars = this.getVar(ConfVars.HIVE_SPARK_RSC_CONF_LIST);
+    if (vars != null) {
+      for (String var : vars.split(",")) {
+        rscList.add(var.trim());
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/data/scripts/sleep.py
----------------------------------------------------------------------
diff --git a/data/scripts/sleep.py b/data/scripts/sleep.py
new file mode 100644
index 0000000..342f14c
--- /dev/null
+++ b/data/scripts/sleep.py
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys
+import time
+
+for line in sys.stdin.readlines():
+    time.sleep(3)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
index 7473050..a638f83 100644
--- a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
+++ b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
@@ -1,6 +1,10 @@
 set hive.spark.job.max.tasks=2;
 
+add file ../../data/scripts/sleep.py;
+
 EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
 
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;

http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
index 5bdb014..fd43b67 100644
--- a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
+++ b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
@@ -1,6 +1,10 @@
 set hive.spark.stage.max.tasks=1;
 
+add file ../../data/scripts/sleep.py;
+
 EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
 
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;

http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
index ba2f09e..b259b63 100644
--- a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
+++ b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
@@ -1,8 +1,10 @@
 PREHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
 PREHOOK: type: QUERY
 POSTHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -22,39 +24,43 @@ STAGE PLANS:
                   alias: src1
                   Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
+                    expressions: key (type: string)
+                    outputColumnNames: key
                     Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                     Group By Operator
-                      aggregations: sum(value)
                       keys: key (type: string)
                       mode: hash
-                      outputColumnNames: _col0, _col1
+                      outputColumnNames: _col0
                       Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                       Reduce Output Operator
                         key expressions: _col0 (type: string)
                         sort order: +
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: double)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator
-                aggregations: sum(VALUE._col0)
                 keys: KEY._col0 (type: string)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1
+                outputColumnNames: _col0
                 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col1 (type: double)
-                  sort order: +
+                Transform Operator
+                  command: python sleep.py
+                  output info:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col0 (type: string)
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: string)
         Reducer 3 
             Reduce Operator Tree:
               Select Operator
-                expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
-                outputColumnNames: _col0, _col1
+                expressions: VALUE._col0 (type: string)
+                outputColumnNames: _col0
                 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
@@ -70,7 +76,8 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src1
 #### A masked pattern was here ####

http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
index ba2f09e..b259b63 100644
--- a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
+++ b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
@@ -1,8 +1,10 @@
 PREHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
 PREHOOK: type: QUERY
 POSTHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
@@ -22,39 +24,43 @@ STAGE PLANS:
                   alias: src1
                   Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
+                    expressions: key (type: string)
+                    outputColumnNames: key
                     Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                     Group By Operator
-                      aggregations: sum(value)
                       keys: key (type: string)
                       mode: hash
-                      outputColumnNames: _col0, _col1
+                      outputColumnNames: _col0
                       Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
                       Reduce Output Operator
                         key expressions: _col0 (type: string)
                         sort order: +
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: double)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator
-                aggregations: sum(VALUE._col0)
                 keys: KEY._col0 (type: string)
                 mode: mergepartial
-                outputColumnNames: _col0, _col1
+                outputColumnNames: _col0
                 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col1 (type: double)
-                  sort order: +
+                Transform Operator
+                  command: python sleep.py
+                  output info:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                   Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col0 (type: string)
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col0 (type: string)
         Reducer 3 
             Reduce Operator Tree:
               Select Operator
-                expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
-                outputColumnNames: _col0, _col1
+                expressions: VALUE._col0 (type: string)
+                outputColumnNames: _col0
                 Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
                 File Output Operator
                   compressed: false
@@ -70,7 +76,8 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+  FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src1
 #### A masked pattern was here ####