You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/11/20 02:10:26 UTC
hive git commit: HIVE-17964: HoS: some spark configs doesn't require
re-creating a session (Rui reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/master f1698b62c -> 966d2b303
HIVE-17964: HoS: some spark configs doesn't require re-creating a session (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/966d2b30
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/966d2b30
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/966d2b30
Branch: refs/heads/master
Commit: 966d2b303b473e46a2877c13a58497b98c0896d5
Parents: f1698b6
Author: Rui Li <li...@apache.org>
Authored: Mon Nov 20 10:10:21 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Nov 20 10:10:21 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 22 ++++++++++-
data/scripts/sleep.py | 23 ++++++++++++
.../clientnegative/spark_job_max_tasks.q | 8 +++-
.../clientnegative/spark_stage_max_tasks.q | 8 +++-
.../spark/spark_job_max_tasks.q.out | 39 ++++++++++++--------
.../spark/spark_stage_max_tasks.q.out | 39 ++++++++++++--------
6 files changed, 102 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1a1d50c..0cc8de0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -91,6 +91,7 @@ public class HiveConf extends Configuration {
private static final Map<String, ConfVars> metaConfs = new HashMap<String, ConfVars>();
private final List<String> restrictList = new ArrayList<String>();
private final Set<String> hiddenSet = new HashSet<String>();
+ private final List<String> rscList = new ArrayList<>();
private Pattern modWhiteListPattern = null;
private volatile boolean isSparkConfigUpdated = false;
@@ -3580,6 +3581,7 @@ public class HiveConf extends Configuration {
"hive.spark.client.secret.bits," +
"hive.spark.client.rpc.server.address," +
"hive.spark.client.rpc.server.port," +
+ "hive.spark.client.rpc.sasl.mechanisms," +
"bonecp.,"+
"hive.druid.broker.address.default,"+
"hive.druid.coordinator.address.default,"+
@@ -3600,6 +3602,12 @@ public class HiveConf extends Configuration {
"hive.added.files.path,hive.added.jars.path,hive.added.archives.path",
"Comma separated list of variables which are used internally and should not be configurable."),
+ HIVE_SPARK_RSC_CONF_LIST("hive.spark.rsc.conf.list",
+ SPARK_OPTIMIZE_SHUFFLE_SERDE.varname + "," +
+ SPARK_CLIENT_FUTURE_TIMEOUT.varname,
+ "Comma separated list of variables which are related to remote spark context.\n" +
+ "Changing these variables will result in re-creating the spark session."),
+
HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
new TimeValidator(TimeUnit.SECONDS),
"Timeout for Running Query in seconds. A nonpositive value means infinite. " +
@@ -3927,7 +3935,7 @@ public class HiveConf extends Configuration {
if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
result = true;
}
- } else if (name.startsWith("hive.spark")) { // Remote Spark Context property.
+ } else if (rscList.stream().anyMatch(rscVar -> rscVar.equals(name))) { // Remote Spark Context property.
result = true;
} else if (name.equals("mapreduce.job.queuename")) {
// a special property starting with mapreduce that we would also like to effect if it changes
@@ -4409,6 +4417,7 @@ public class HiveConf extends Configuration {
setupRestrictList();
hiddenSet.clear();
hiddenSet.addAll(HiveConfUtil.getHiddenSet(this));
+ setupRSCList();
}
/**
@@ -4799,6 +4808,17 @@ public class HiveConf extends Configuration {
restrictList.add(ConfVars.HIVE_CONF_RESTRICTED_LIST.varname);
restrictList.add(ConfVars.HIVE_CONF_HIDDEN_LIST.varname);
restrictList.add(ConfVars.HIVE_CONF_INTERNAL_VARIABLE_LIST.varname);
+ restrictList.add(ConfVars.HIVE_SPARK_RSC_CONF_LIST.varname);
+ }
+
+ private void setupRSCList() {
+ rscList.clear();
+ String vars = this.getVar(ConfVars.HIVE_SPARK_RSC_CONF_LIST);
+ if (vars != null) {
+ for (String var : vars.split(",")) {
+ rscList.add(var.trim());
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/data/scripts/sleep.py
----------------------------------------------------------------------
diff --git a/data/scripts/sleep.py b/data/scripts/sleep.py
new file mode 100644
index 0000000..342f14c
--- /dev/null
+++ b/data/scripts/sleep.py
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import sys
+import time
+
+for line in sys.stdin.readlines():
+ time.sleep(3)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
index 7473050..a638f83 100644
--- a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
+++ b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
@@ -1,6 +1,10 @@
set hive.spark.job.max.tasks=2;
+add file ../../data/scripts/sleep.py;
+
EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
index 5bdb014..fd43b67 100644
--- a/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
+++ b/ql/src/test/queries/clientnegative/spark_stage_max_tasks.q
@@ -1,6 +1,10 @@
set hive.spark.stage.max.tasks=1;
+add file ../../data/scripts/sleep.py;
+
EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k;
http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
index ba2f09e..b259b63 100644
--- a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
+++ b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
@@ -1,8 +1,10 @@
PREHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
POSTHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -22,39 +24,43 @@ STAGE PLANS:
alias: src1
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: key (type: string), value (type: string)
- outputColumnNames: key, value
+ expressions: key (type: string)
+ outputColumnNames: key
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Group By Operator
- aggregations: sum(value)
keys: key (type: string)
mode: hash
- outputColumnNames: _col0, _col1
+ outputColumnNames: _col0
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col1 (type: double)
Reducer 2
Reduce Operator Tree:
Group By Operator
- aggregations: sum(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
- outputColumnNames: _col0, _col1
+ outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col1 (type: double)
- sort order: +
+ Transform Operator
+ command: python sleep.py
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: string)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string)
Reducer 3
Reduce Operator Tree:
Select Operator
- expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
- outputColumnNames: _col0, _col1
+ expressions: VALUE._col0 (type: string)
+ outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
@@ -70,7 +76,8 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
PREHOOK: Input: default@src1
#### A masked pattern was here ####
http://git-wip-us.apache.org/repos/asf/hive/blob/966d2b30/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
index ba2f09e..b259b63 100644
--- a/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
+++ b/ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out
@@ -1,8 +1,10 @@
PREHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
POSTHOOK: query: EXPLAIN
-SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -22,39 +24,43 @@ STAGE PLANS:
alias: src1
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: key (type: string), value (type: string)
- outputColumnNames: key, value
+ expressions: key (type: string)
+ outputColumnNames: key
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Group By Operator
- aggregations: sum(value)
keys: key (type: string)
mode: hash
- outputColumnNames: _col0, _col1
+ outputColumnNames: _col0
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col1 (type: double)
Reducer 2
Reduce Operator Tree:
Group By Operator
- aggregations: sum(VALUE._col0)
keys: KEY._col0 (type: string)
mode: mergepartial
- outputColumnNames: _col0, _col1
+ outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col1 (type: double)
- sort order: +
+ Transform Operator
+ command: python sleep.py
+ output info:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: string)
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string)
Reducer 3
Reduce Operator Tree:
Select Operator
- expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
- outputColumnNames: _col0, _col1
+ expressions: VALUE._col0 (type: string)
+ outputColumnNames: _col0
Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
@@ -70,7 +76,8 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: query: SELECT TRANSFORM(key) USING 'python sleep.py' AS k
+ FROM (SELECT key FROM src1 GROUP BY key) a ORDER BY k
PREHOOK: type: QUERY
PREHOOK: Input: default@src1
#### A masked pattern was here ####