You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 22:17:29 UTC
[12/50] [abbrv] hive git commit: HIVE-16552: Limit the number of
tasks a Spark job may contain (Reviewed by Rui)
HIVE-16552: Limit the number of tasks a Spark job may contain (Reviewed by Rui)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6b5ad66
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6b5ad66
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6b5ad66
Branch: refs/heads/hive-14535
Commit: c6b5ad663d235c15fc5bb5a24a1d3e9ac0d05140
Parents: 9e9356b
Author: Xuefu Zhang <xu...@uber.com>
Authored: Thu May 4 09:31:28 2017 -0700
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Thu May 4 09:31:28 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../test/resources/testconfiguration.properties | 4 +-
.../hadoop/hive/cli/control/CliConfigs.java | 1 +
.../hadoop/hive/ql/exec/spark/SparkTask.java | 6 ++
.../spark/status/RemoteSparkJobMonitor.java | 15 +++-
.../ql/exec/spark/status/SparkJobMonitor.java | 10 ++-
.../clientnegative/spark_job_max_tasks.q | 6 ++
.../spark/spark_job_max_tasks.q.out | 77 ++++++++++++++++++++
8 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 84398c6..99c26ce 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3350,6 +3350,8 @@ public class HiveConf extends Configuration {
"hive.spark.use.groupby.shuffle", true,
"Spark groupByKey transformation has better performance but uses unbounded memory." +
"Turn this off when there is a memory issue."),
+ SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" +
+ "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."),
NWAYJOINREORDER("hive.reorder.nway.joins", true,
"Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 753f3a9..5ab3076 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1445,4 +1445,6 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
groupby2_multi_distinct.q,\
groupby3_map_skew_multi_distinct.q,\
groupby3_multi_distinct.q,\
- groupby_grouping_sets7.q
+ groupby_grouping_sets7.q,\
+ spark_job_max_tasks.q
+
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index 67064b8..1457db0 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -287,6 +287,7 @@ public class CliConfigs {
excludesFrom(testConfigProps, "minimr.query.negative.files");
excludeQuery("authorization_uri_import.q");
+ excludeQuery("spark_job_max_tasks.q");
setResultsDir("ql/src/test/results/clientnegative");
setLogDir("itests/qtest/target/qfile-results/clientnegative");
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 32a7730..98b1605 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -129,8 +129,14 @@ public class SparkTask extends Task<SparkWork> {
// TODO: If the timeout is because of lack of resources in the cluster, we should
// ideally also cancel the app request here. But w/o facilities from Spark or YARN,
// it's difficult to do it on hive side alone. See HIVE-12650.
+ LOG.info("Failed to submit Spark job " + sparkJobID);
+ jobRef.cancelJob();
+ } else if (rc == 4) {
+ LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) +
+ ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
jobRef.cancelJob();
}
+
if (this.jobID == null) {
this.jobID = sparkJobStatus.getAppID();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index dd73f3e..9dfb65e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -34,7 +34,8 @@ import org.apache.spark.JobExecutionStatus;
* It print current job status to console and sleep current thread between monitor interval.
*/
public class RemoteSparkJobMonitor extends SparkJobMonitor {
-
+ private int sparkJobMaxTaskCount = -1;
+ private int totalTaskCount = 0;
private RemoteSparkJobStatus sparkJobStatus;
private final HiveConf hiveConf;
@@ -42,6 +43,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
super(hiveConf);
this.sparkJobStatus = sparkJobStatus;
this.hiveConf = hiveConf;
+ sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS);
}
@Override
@@ -100,6 +102,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
} else {
console.logInfo(format);
}
+ } else {
+ // Count the number of tasks, and kill application if it goes beyond the limit.
+ if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) {
+ totalTaskCount = getTotalTaskCount(progressMap);
+ if (totalTaskCount > sparkJobMaxTaskCount) {
+ rc = 4;
+ done = true;
+ console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" +
+ sparkJobMaxTaskCount + "]. The Spark job will be cancelled.");
+ }
+ }
}
printStatus(progressMap, lastProgressMap);
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 0b224f2..41730b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -66,7 +66,6 @@ abstract class SparkJobMonitor {
private int lines = 0;
private final PrintStream out;
-
private static final int COLUMN_1_WIDTH = 16;
private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s ";
private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s ";
@@ -173,6 +172,15 @@ abstract class SparkJobMonitor {
lastPrintTime = System.currentTimeMillis();
}
+ protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) {
+ int totalTasks = 0;
+ for (SparkStageProgress progress: progressMap.values() ) {
+ totalTasks += progress.getTotalTaskCount();
+ }
+
+ return totalTasks;
+ }
+
private String getReport(Map<String, SparkStageProgress> progressMap) {
StringBuilder reportBuffer = new StringBuilder();
SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
new file mode 100644
index 0000000..7473050
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
@@ -0,0 +1,6 @@
+set hive.spark.job.max.tasks=2;
+
+EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
new file mode 100644
index 0000000..ba2f09e
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
@@ -0,0 +1,77 @@
+PREHOOK: query: EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 3 <- Reducer 2 (SORT, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: src1
+ Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: key, value
+ Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(value)
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: double)
+ Reducer 2
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col1 (type: double)
+ sort order: +
+ Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string)
+ Reducer 3
+ Reduce Operator Tree:
+ Select Operator
+ expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 4 from org.apache.hadoop.hive.ql.exec.spark.SparkTask