You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 22:17:29 UTC

[12/50] [abbrv] hive git commit: HIVE-16552: Limit the number of tasks a Spark job may contain (Reviewed by Rui)

HIVE-16552: Limit the number of tasks a Spark job may contain (Reviewed by Rui)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6b5ad66
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6b5ad66
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6b5ad66

Branch: refs/heads/hive-14535
Commit: c6b5ad663d235c15fc5bb5a24a1d3e9ac0d05140
Parents: 9e9356b
Author: Xuefu Zhang <xu...@uber.com>
Authored: Thu May 4 09:31:28 2017 -0700
Committer: Xuefu Zhang <xu...@uber.com>
Committed: Thu May 4 09:31:28 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../test/resources/testconfiguration.properties |  4 +-
 .../hadoop/hive/cli/control/CliConfigs.java     |  1 +
 .../hadoop/hive/ql/exec/spark/SparkTask.java    |  6 ++
 .../spark/status/RemoteSparkJobMonitor.java     | 15 +++-
 .../ql/exec/spark/status/SparkJobMonitor.java   | 10 ++-
 .../clientnegative/spark_job_max_tasks.q        |  6 ++
 .../spark/spark_job_max_tasks.q.out             | 77 ++++++++++++++++++++
 8 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 84398c6..99c26ce 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3350,6 +3350,8 @@ public class HiveConf extends Configuration {
         "hive.spark.use.groupby.shuffle", true,
         "Spark groupByKey transformation has better performance but uses unbounded memory." +
             "Turn this off when there is a memory issue."),
+    SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" +
+            "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."),
     NWAYJOINREORDER("hive.reorder.nway.joins", true,
       "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
     HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 753f3a9..5ab3076 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1445,4 +1445,6 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
   groupby2_multi_distinct.q,\
   groupby3_map_skew_multi_distinct.q,\
   groupby3_multi_distinct.q,\
-  groupby_grouping_sets7.q
+  groupby_grouping_sets7.q,\
+  spark_job_max_tasks.q
+

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
index 67064b8..1457db0 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java
@@ -287,6 +287,7 @@ public class CliConfigs {
 
         excludesFrom(testConfigProps, "minimr.query.negative.files");
         excludeQuery("authorization_uri_import.q");
+        excludeQuery("spark_job_max_tasks.q");
 
         setResultsDir("ql/src/test/results/clientnegative");
         setLogDir("itests/qtest/target/qfile-results/clientnegative");

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 32a7730..98b1605 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -129,8 +129,14 @@ public class SparkTask extends Task<SparkWork> {
         // TODO: If the timeout is because of lack of resources in the cluster, we should
         // ideally also cancel the app request here. But w/o facilities from Spark or YARN,
         // it's difficult to do it on hive side alone. See HIVE-12650.
+        LOG.info("Failed to submit Spark job " + sparkJobID);
+        jobRef.cancelJob();
+      } else if (rc == 4) {
+        LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) +
+            ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
         jobRef.cancelJob();
       }
+
       if (this.jobID == null) {
         this.jobID = sparkJobStatus.getAppID();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index dd73f3e..9dfb65e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -34,7 +34,8 @@ import org.apache.spark.JobExecutionStatus;
  * It print current job status to console and sleep current thread between monitor interval.
  */
 public class RemoteSparkJobMonitor extends SparkJobMonitor {
-
+  private int sparkJobMaxTaskCount = -1;
+  private int totalTaskCount = 0;
   private RemoteSparkJobStatus sparkJobStatus;
   private final HiveConf hiveConf;
 
@@ -42,6 +43,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     super(hiveConf);
     this.sparkJobStatus = sparkJobStatus;
     this.hiveConf = hiveConf;
+    sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS);
   }
 
   @Override
@@ -100,6 +102,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
               } else {
                 console.logInfo(format);
               }
+            } else {
+              // Count the number of tasks, and kill application if it goes beyond the limit.
+              if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) {
+                totalTaskCount = getTotalTaskCount(progressMap);
+                if (totalTaskCount > sparkJobMaxTaskCount) {
+                  rc = 4;
+                  done = true;
+                  console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" +
+                      sparkJobMaxTaskCount + "]. The Spark job will be cancelled.");
+                }
+              }
             }
 
             printStatus(progressMap, lastProgressMap);

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 0b224f2..41730b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -66,7 +66,6 @@ abstract class SparkJobMonitor {
   private int lines = 0;
   private final PrintStream out;
 
-
   private static final int COLUMN_1_WIDTH = 16;
   private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  %7s  %6s  ";
   private static final String STAGE_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  %7s  %6s  ";
@@ -173,6 +172,15 @@ abstract class SparkJobMonitor {
     lastPrintTime = System.currentTimeMillis();
   }
 
+  protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) {
+    int totalTasks = 0;
+    for (SparkStageProgress progress: progressMap.values() ) {
+      totalTasks += progress.getTotalTaskCount();
+    }
+
+    return totalTasks;
+  }
+
   private String getReport(Map<String, SparkStageProgress> progressMap) {
     StringBuilder reportBuffer = new StringBuilder();
     SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_job_max_tasks.q b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
new file mode 100644
index 0000000..7473050
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_job_max_tasks.q
@@ -0,0 +1,6 @@
+set hive.spark.job.max.tasks=2;
+
+EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;
+
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s;

http://git-wip-us.apache.org/repos/asf/hive/blob/c6b5ad66/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
new file mode 100644
index 0000000..ba2f09e
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out
@@ -0,0 +1,77 @@
+PREHOOK: query: EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 3 <- Reducer 2 (SORT, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: src1
+                  Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: key, value
+                    Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: sum(value)
+                      keys: key (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: double)
+        Reducer 2 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col1 (type: double)
+                  sort order: +
+                  Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col0 (type: string)
+        Reducer 3 
+            Reduce Operator Tree:
+              Select Operator
+                expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src1
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 4 from org.apache.hadoop.hive.ql.exec.spark.SparkTask