You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/03/03 13:45:35 UTC
[hive] branch master updated: HIVE-22872: Support multiple
executors for scheduled queries (Zoltan Haindrich reviewed by Jesus Camacho
Rodriguez)
This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9cdf97f HIVE-22872: Support multiple executors for scheduled queries (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)
9cdf97f is described below
commit 9cdf97f3f851fd835f3c5caae676e1cd737816ec
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Tue Mar 3 13:34:18 2020 +0000
HIVE-22872: Support multiple executors for scheduled queries (Zoltan Haindrich reviewed by Jesus Camacho Rodriguez)
Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../upgrade/hive/hive-schema-4.0.0.hive.sql | 7 +-
.../exec/schq/ScheduledQueryMaintenanceTask.java | 8 +-
.../scheduled/ScheduledQueryExecutionContext.java | 4 +
.../scheduled/ScheduledQueryExecutionService.java | 170 +++++++++++++++++----
.../hive/ql/schq/TestScheduledQueryService.java | 4 -
ql/src/test/queries/clientpositive/schq_analyze.q | 2 +-
.../queries/clientpositive/schq_materialized.q | 2 +-
.../clientpositive/llap/schq_materialized.q.out | 2 +-
.../test/results/clientpositive/llap/sysdb.q.out | 10 +-
.../results/clientpositive/llap/sysdb_schq.q.out | 6 +-
.../hadoop/hive/metastore/MetastoreTaskThread.java | 10 +-
.../hadoop/hive/metastore/utils/package-info.java | 22 ---
.../apache/hadoop/hive/metastore/ObjectStore.java | 23 ++-
.../ScheduledQueryExecutionsMaintTask.java | 7 +
.../hive/metastore/model/MScheduledQuery.java | 9 ++
.../src/main/resources/package.jdo | 4 +
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 3 +-
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 2 +
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 29 ++++
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 30 ++++
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 1 +
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 2 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 1 +
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 2 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 1 +
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 2 +
27 files changed, 290 insertions(+), 75 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3d4e9e0..7ea2de9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4873,6 +4873,8 @@ public class HiveConf extends Configuration {
HIVE_SECURITY_AUTHORIZATION_SCHEDULED_QUERIES_SUPPORTED("hive.security.authorization.scheduled.queries.supported",
false,
"Enable this if the configured authorizer is able to handle scheduled query related calls."),
+ HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS("hive.scheduled.queries.max.executors", 4, new RangeValidator(1, null),
+ "Maximal number of scheduled query executors to allow."),
HIVE_QUERY_RESULTS_CACHE_ENABLED("hive.query.results.cache.enabled", true,
"If the query results cache is enabled. This will keep results of previously executed queries " +
diff --git a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
index fde6f02..03540bb 100644
--- a/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
+++ b/metastore/scripts/upgrade/hive/hive-schema-4.0.0.hive.sql
@@ -1211,6 +1211,7 @@ CREATE EXTERNAL TABLE IF NOT EXISTS `SCHEDULED_QUERIES` (
`USER` string,
`QUERY` string,
`NEXT_EXECUTION` bigint,
+ `ACTIVE_EXECUTION_ID` bigint,
CONSTRAINT `SYS_PK_SCHEDULED_QUERIES` PRIMARY KEY (`SCHEDULED_QUERY_ID`) DISABLE
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
@@ -1225,7 +1226,8 @@ TBLPROPERTIES (
\"SCHEDULE\",
\"USER\",
\"QUERY\",
- \"NEXT_EXECUTION\"
+ \"NEXT_EXECUTION\",
+ \"ACTIVE_EXECUTION_ID\"
FROM
\"SCHEDULED_QUERIES\""
);
@@ -1795,7 +1797,8 @@ select
`SCHEDULE`,
`USER`,
`QUERY`,
- FROM_UNIXTIME(NEXT_EXECUTION) as NEXT_EXECUTION
+ FROM_UNIXTIME(NEXT_EXECUTION) as NEXT_EXECUTION,
+ `ACTIVE_EXECUTION_ID`
FROM
SYS.SCHEDULED_QUERIES
;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java
index fd0c173..5abfa4d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/schq/ScheduledQueryMaintenanceTask.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.schq;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequestType;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -46,7 +47,11 @@ public class ScheduledQueryMaintenanceTask extends Task<ScheduledQueryMaintenanc
ScheduledQueryMaintenanceRequest request = buildScheduledQueryRequest();
try {
Hive.get().getMSC().scheduledQueryMaintenance(request);
- if (work.getScheduledQuery().isSetNextExecution()) {
+ if (work.getScheduledQuery().isSetNextExecution()
+ || request.getType() == ScheduledQueryMaintenanceRequestType.CREATE) {
+ // we might have a scheduled query available for execution; immediately:
+ // * in case a schedule is altered to be executed at a specific time
+ // * in case we created a new scheduled query - for say run every second
ScheduledQueryExecutionService.forceScheduleCheck();
}
} catch (TException | HiveException e) {
@@ -68,5 +73,4 @@ public class ScheduledQueryMaintenanceTask extends Task<ScheduledQueryMaintenanc
public StageType getType() {
return StageType.SCHEDULED_QUERY_MAINT;
}
-
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
index 1bb24ee..32cb316 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
@@ -66,4 +66,8 @@ public class ScheduledQueryExecutionContext {
return conf.getTimeVar(ConfVars.HIVE_SCHEDULED_QUERIES_EXECUTOR_PROGRESS_REPORT_INTERVAL, TimeUnit.MILLISECONDS);
}
+ public int getNumberOfExecutors() {
+ return conf.getIntVar(ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS);
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
index 9a6237c..8443b3f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
@@ -19,13 +19,18 @@ package org.apache.hadoop.hive.ql.scheduled;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.QueryState;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
@@ -48,18 +53,29 @@ public class ScheduledQueryExecutionService implements Closeable {
private static ScheduledQueryExecutionService INSTANCE = null;
private ScheduledQueryExecutionContext context;
- private ScheduledQueryExecutor worker;
private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger();
+ private AtomicInteger usedExecutors = new AtomicInteger(0);
+ private Queue<ScheduledQueryExecutor> runningExecutors = new ConcurrentLinkedQueue<>();
public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) {
HiveConf conf = new HiveConf(inputConf);
MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
- ExecutorService executor = Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build());
+ ExecutorService executor = buildExecutor(conf);
ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
return startScheduledQueryExecutorService(ctx);
}
+ private static ExecutorService buildExecutor(HiveConf conf) {
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build();
+ int systemThreads = 2; // poller,reporter
+ int minServiceThreads = 1; // always keep 1 thread to be used for executing scheduled queries
+ int maxServiceThreads = conf.getIntVar(ConfVars.HIVE_SCHEDULED_QUERIES_MAX_EXECUTORS);
+ return new ThreadPoolExecutor(systemThreads + minServiceThreads, systemThreads + maxServiceThreads,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ threadFactory);
+ }
+
public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) {
synchronized (ScheduledQueryExecutionService.class) {
if (INSTANCE != null) {
@@ -73,7 +89,7 @@ public class ScheduledQueryExecutionService implements Closeable {
private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
context = ctx;
- ctx.executor.submit(worker = new ScheduledQueryExecutor());
+ ctx.executor.submit(new ScheduledQueryPoller());
ctx.executor.submit(new ProgressReporter());
}
@@ -81,23 +97,52 @@ public class ScheduledQueryExecutionService implements Closeable {
return state == QueryState.FINISHED || state == QueryState.FAILED;
}
- class ScheduledQueryExecutor implements Runnable {
+ /**
+ * Renames the {@link Thread} to make it more clear what it is working on.
+ */
+ static class NamedThread implements Closeable {
+ private final String oldName;
- private ScheduledQueryProgressInfo info;
+ public NamedThread(String newName) {
+ LOG.info("Starting {} thread - renaming accordingly.", newName);
+ oldName = Thread.currentThread().getName();
+ Thread.currentThread().setName(newName);
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Thread finished; renaming back to: {}", oldName);
+ Thread.currentThread().setName(oldName);
+ }
+ }
+
+ /**
+ * The poller is responsible for checking for available scheduled queries.
+ *
+ * It also handles forced wakeup calls to reduce the impact that the default check period might be minutes.
+ * There might be only 1 running poller service at a time in a hiveserver instance.
+ */
+ class ScheduledQueryPoller implements Runnable {
@Override
public void run() {
- while (true) {
- ScheduledQueryPollResponse q = context.schedulerService.scheduledQueryPoll();
- if (q.isSetExecutionId()) {
- try{
- processQuery(q);
- } catch (Throwable t) {
- LOG.error("Unexpected exception during scheduled query processing", t);
+ try (NamedThread namedThread = new NamedThread("Scheduled Query Poller")) {
+ while (!context.executor.isShutdown()) {
+ int origResets = forcedScheduleCheckCounter.get();
+ if (usedExecutors.get() < context.getNumberOfExecutors()) {
+ try {
+ ScheduledQueryPollResponse q = context.schedulerService.scheduledQueryPoll();
+ if (q.isSetExecutionId()) {
+ context.executor.submit(new ScheduledQueryExecutor(q));
+ // skip sleep and poll again if there are available executor
+ continue;
+ }
+ } catch (Throwable t) {
+ LOG.error("Unexpected exception during scheduled query submission", t);
+ }
}
- } else {
try {
- sleep(context.getIdleSleepTime());
+ sleep(context.getIdleSleepTime(), origResets);
} catch (InterruptedException e) {
LOG.warn("interrupt discarded");
}
@@ -105,9 +150,8 @@ public class ScheduledQueryExecutionService implements Closeable {
}
}
- private void sleep(long idleSleepTime) throws InterruptedException {
+ private void sleep(long idleSleepTime, int origResets) throws InterruptedException {
long checkIntrvalMs = 1000;
- int origResets = forcedScheduleCheckCounter.get();
for (long i = 0; i < idleSleepTime; i += checkIntrvalMs) {
Thread.sleep(checkIntrvalMs);
if (forcedScheduleCheckCounter.get() != origResets) {
@@ -116,6 +160,47 @@ public class ScheduledQueryExecutionService implements Closeable {
}
}
+ }
+
+ private void executorStarted(ScheduledQueryExecutor executor) {
+ runningExecutors.add(executor);
+ usedExecutors.incrementAndGet();
+ }
+
+ private void executorStopped(ScheduledQueryExecutor executor) {
+ usedExecutors.decrementAndGet();
+ runningExecutors.remove(executor);
+ forceScheduleCheck();
+ }
+
+ /**
+ * Responsible for a single execution of a scheduled query.
+ *
+ * The execution happens in a separate thread.
+ */
+ class ScheduledQueryExecutor implements Runnable {
+
+ private ScheduledQueryProgressInfo info;
+ private final ScheduledQueryPollResponse pollResponse;
+
+ public ScheduledQueryExecutor(ScheduledQueryPollResponse pollResponse) {
+ this.pollResponse = pollResponse;
+ executorStarted(this);
+ }
+
+ public void run() {
+ try (NamedThread namedThread = new NamedThread(getThreadName())) {
+ processQuery(pollResponse);
+ } finally {
+ executorStopped(this);
+ }
+ }
+
+ private String getThreadName() {
+ return String.format("Scheduled Query Executor(schedule:%s, execution_id:%d)",
+ pollResponse.getScheduleKey().getScheduleName(), pollResponse.getExecutionId());
+ }
+
public synchronized void reportQueryProgress() {
if (info != null) {
LOG.info("Reporting query progress of {} as {} err:{}", info.getScheduledExecutionId(), info.getState(),
@@ -128,10 +213,12 @@ public class ScheduledQueryExecutionService implements Closeable {
}
private void processQuery(ScheduledQueryPollResponse q) {
- SessionState state = null;
+ LOG.info("Executing schq:{}, executionId: {}", q.getScheduleKey().getScheduleName(), q.getExecutionId());
info = new ScheduledQueryProgressInfo();
- info.setScheduledExecutionId(q.getExecutionId());
+ info.setScheduledExecutionId(pollResponse.getExecutionId());
info.setState(QueryState.EXECUTING);
+ info.setExecutorQueryId(buildExecutorQueryId(""));
+ SessionState state = null;
try {
HiveConf conf = new HiveConf(context.conf);
conf.set(Constants.HIVE_QUERY_EXCLUSIVE_LOCK, lockNameFor(q.getScheduleKey()));
@@ -162,7 +249,11 @@ public class ScheduledQueryExecutionService implements Closeable {
}
private String buildExecutorQueryId(IDriver driver) {
- return String.format("%s/%s", context.executorHostName, driver.getQueryState().getQueryId());
+ return buildExecutorQueryId(driver.getQueryState().getQueryId());
+ }
+
+ private String buildExecutorQueryId(String queryId) {
+ return String.format("%s/%s", context.executorHostName, queryId);
}
private String lockNameFor(ScheduledQueryKey scheduleKey) {
@@ -179,43 +270,56 @@ public class ScheduledQueryExecutionService implements Closeable {
}
}
+ /**
+ * Reports progress periodically.
+ *
+ * To retain the running state of all the in-flight scheduled query executions;
+ * this class initiates a reporting round periodically.
+ */
class ProgressReporter implements Runnable {
@Override
public void run() {
- while (true) {
- try {
- Thread.sleep(context.getProgressReporterSleepTime());
- } catch (InterruptedException e) {
- LOG.warn("interrupt discarded");
- }
- try {
- worker.reportQueryProgress();
- } catch (Exception e) {
- LOG.error("ProgressReporter encountered exception ", e);
+ try (NamedThread namedThread = new NamedThread("Scheduled Query Progress Reporter")) {
+ while (!context.executor.isShutdown()) {
+ try {
+ Thread.sleep(context.getProgressReporterSleepTime());
+ } catch (InterruptedException e) {
+ LOG.warn("interrupt discarded");
+ }
+ try {
+ for (ScheduledQueryExecutor worker : runningExecutors) {
+ worker.reportQueryProgress();
+ }
+ } catch (Exception e) {
+ LOG.error("ProgressReporter encountered exception ", e);
+ }
}
}
}
}
- @VisibleForTesting
@Override
public void close() throws IOException {
synchronized (ScheduledQueryExecutionService.class) {
if (INSTANCE == null || INSTANCE != this) {
throw new IllegalStateException("The current ScheduledQueryExecutionService INSTANCE is invalid");
}
- INSTANCE = null;
context.executor.shutdown();
+ forceScheduleCheck();
try {
context.executor.awaitTermination(1, TimeUnit.SECONDS);
context.executor.shutdownNow();
+ INSTANCE = null;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
+ /**
+ * Forces the poller thread to re-check schedules before the normal timeout happens.
+ */
public static void forceScheduleCheck() {
INSTANCE.forcedScheduleCheckCounter.incrementAndGet();
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
index a8fe0c3..dd8da34 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
@@ -113,7 +113,6 @@ public class TestScheduledQueryService {
@Override
public ScheduledQueryPollResponse scheduledQueryPoll() {
-
ScheduledQueryPollResponse r = new ScheduledQueryPollResponse();
r.setExecutionId(id++);
r.setQuery(stmt);
@@ -154,8 +153,6 @@ public class TestScheduledQueryService {
MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)");
ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) {
-
- executor.shutdown();
// Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
synchronized (qService.notifier) {
@@ -163,7 +160,6 @@ public class TestScheduledQueryService {
}
SessionState.getConsole().logInfo("Done waiting for query execution!");
}
- executor.shutdownNow();
assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true));
assertThat(qService.lastProgressInfo.getExecutorQueryId(),
diff --git a/ql/src/test/queries/clientpositive/schq_analyze.q b/ql/src/test/queries/clientpositive/schq_analyze.q
index 969b47b..3c03360 100644
--- a/ql/src/test/queries/clientpositive/schq_analyze.q
+++ b/ql/src/test/queries/clientpositive/schq_analyze.q
@@ -21,7 +21,7 @@ create scheduled query t_analyze cron '0 */1 * * * ? *' as analyze table t compu
alter scheduled query t_analyze execute;
-!sleep 3;
+!sleep 10;
select * from information_schema.scheduled_executions s where schedule_name='ex_analyze' order by scheduled_execution_id desc limit 3;
diff --git a/ql/src/test/queries/clientpositive/schq_materialized.q b/ql/src/test/queries/clientpositive/schq_materialized.q
index 6baed49..9848f9f 100644
--- a/ql/src/test/queries/clientpositive/schq_materialized.q
+++ b/ql/src/test/queries/clientpositive/schq_materialized.q
@@ -71,7 +71,7 @@ select `(NEXT_EXECUTION|SCHEDULED_QUERY_ID)?+.+` from sys.scheduled_queries;
alter scheduled query d execute;
-!sleep 3;
+!sleep 10;
-- the scheduled execution will fail - because of missing TXN; but overall it works..
select state,error_message from sys.scheduled_executions;
diff --git a/ql/src/test/results/clientpositive/llap/schq_materialized.q.out b/ql/src/test/results/clientpositive/llap/schq_materialized.q.out
index e904d46..194a1b3 100644
--- a/ql/src/test/results/clientpositive/llap/schq_materialized.q.out
+++ b/ql/src/test/results/clientpositive/llap/schq_materialized.q.out
@@ -277,7 +277,7 @@ POSTHOOK: query: select `(NEXT_EXECUTION|SCHEDULED_QUERY_ID)?+.+` from sys.sched
POSTHOOK: type: QUERY
POSTHOOK: Input: sys@scheduled_queries
#### A masked pattern was here ####
-d true hive 0 0 * * * ? * hive_admin_user alter materialized view `default`.`mv1` rebuild
+d true hive 0 0 * * * ? * hive_admin_user alter materialized view `default`.`mv1` rebuild NULL
PREHOOK: query: alter scheduled query d execute
PREHOOK: type: ALTER SCHEDULED QUERY
POSTHOOK: query: alter scheduled query d execute
diff --git a/ql/src/test/results/clientpositive/llap/sysdb.q.out b/ql/src/test/results/clientpositive/llap/sysdb.q.out
index 38cadf3..8b0be82 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb.q.out
@@ -813,6 +813,8 @@ scheduled_executions start_time
scheduled_executions start_time
scheduled_executions state
scheduled_executions state
+scheduled_queries active_execution_id
+scheduled_queries active_execution_id
scheduled_queries cluster_namespace
scheduled_queries cluster_namespace
scheduled_queries enabled
@@ -1069,8 +1071,8 @@ POSTHOOK: Input: sys@columns_v2
a decimal(10,2) 0
acquired_at string 9
action_expression string 4
-add_time int 1
-agent_info string 6
+active_execution_id bigint 8
+active_execution_id bigint 8
PREHOOK: query: select param_key, param_value from database_params order by param_key, param_value limit 5
PREHOOK: type: QUERY
PREHOOK: Input: sys@database_params
@@ -1419,9 +1421,9 @@ POSTHOOK: Input: sys@table_params
#### A masked pattern was here ####
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true","c":"true","d":"true","e":"true","f":"true","g":"true"}}
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"action_expression":"true","name":"true","ns":"true","rp_name":"true","trigger_expression":"true"}}
+COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"active_execution_id":"true","cluster_namespace":"true","enabled":"true","next_execution":"true","query":"true","schedule":"true","schedule_name":"true","scheduled_query_id":"true","user":"true"}}
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"add_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","principal_name":"true","principal_type":"true","role_grant_id":"true","role_id":"true"}}
COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"alloc_fraction":"true","ns":"true","path":"true","query_parallelism":"true","rp_name":"true","scheduling_policy":"true"}}
-COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"authorizer":"true","column_name":"true","create_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","part_col_priv":"true","part_column_grant_id":"true","part_id":"true","principal_name":"true","principal_type":"true"}}
PREHOOK: query: select tbl_name from tbls order by tbl_name limit 5
PREHOOK: type: QUERY
PREHOOK: Input: sys@tbls
@@ -1536,9 +1538,9 @@ POSTHOOK: Input: sys@table_stats_view
#### A masked pattern was here ####
{"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true","c":"true","d":"true","e":"true","f":"true","g":"true"}} 0 0 0 0
{"BASIC_STATS":"true","COLUMN_STATS":{"action_expression":"true","name":"true","ns":"true","rp_name":"true","trigger_expression":"true"}} 0 0 0 0
+{"BASIC_STATS":"true","COLUMN_STATS":{"active_execution_id":"true","cluster_namespace":"true","enabled":"true","next_execution":"true","query":"true","schedule":"true","schedule_name":"true","scheduled_query_id":"true","user":"true"}} 0 0 0 0
{"BASIC_STATS":"true","COLUMN_STATS":{"add_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","principal_name":"true","principal_type":"true","role_grant_id":"true","role_id":"true"}} 0 0 0 0
{"BASIC_STATS":"true","COLUMN_STATS":{"alloc_fraction":"true","ns":"true","path":"true","query_parallelism":"true","rp_name":"true","scheduling_policy":"true"}} 0 0 0 0
-{"BASIC_STATS":"true","COLUMN_STATS":{"authorizer":"true","column_name":"true","create_time":"true","grant_option":"true","grantor":"true","grantor_type":"true","part_col_priv":"true","part_column_grant_id":"true","part_id":"true","principal_name":"true","principal_type":"true"}} 0 0 0 0
PREHOOK: query: select COLUMN_STATS_ACCURATE, NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE FROM PARTITION_STATS_VIEW where COLUMN_STATS_ACCURATE is not null order by NUM_FILES, NUM_ROWS, RAW_DATA_SIZE limit 5
PREHOOK: type: QUERY
PREHOOK: Input: sys@partition_params
diff --git a/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out b/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out
index 8745e3b..3239c36 100644
--- a/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out
+++ b/ql/src/test/results/clientpositive/llap/sysdb_schq.q.out
@@ -27,6 +27,7 @@ schedule string from deserializer
user string from deserializer
query string from deserializer
next_execution bigint from deserializer
+active_execution_id bigint from deserializer
# Detailed Table Information
Database: sys
@@ -35,7 +36,7 @@ Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
- COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"cluster_namespace\":\"true\",\"enabled\":\"true\",\"next_execution\":\"true\",\"query\":\"true\",\"schedule\":\"true\",\"schedule_name\":\"true\",\"scheduled_query_id\":\"true\",\"user\":\"true\"}}
+ COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"active_execution_id\":\"true\",\"cluster_namespace\":\"true\",\"enabled\":\"true\",\"next_execution\":\"true\",\"query\":\"true\",\"schedule\":\"true\",\"schedule_name\":\"true\",\"scheduled_query_id\":\"true\",\"user\":\"true\"}}
EXTERNAL TRUE
bucketing_version 2
hive.sql.database.type METASTORE
@@ -47,7 +48,8 @@ Table Parameters:
\"SCHEDULE\",
\"USER\",
\"QUERY\",
- \"NEXT_EXECUTION\"
+ \"NEXT_EXECUTION\",
+ \"ACTIVE_EXECUTION_ID\"
FROM
\"SCHEDULED_QUERIES\"
numFiles 0
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
index e5d21b0..d56bc2a 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-
import java.util.concurrent.TimeUnit;
/**
@@ -35,4 +34,13 @@ public interface MetastoreTaskThread extends Configurable, Runnable {
* @return frequency
*/
long runFrequency(TimeUnit unit);
+
+ /**
+ * Gets the initial delay before the first execution.
+ *
+ * Defaults to {@link #runFrequency(TimeUnit)}
+ */
+ default long initialDelay(TimeUnit unit) {
+ return runFrequency(unit);
+ }
}
diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
deleted file mode 100644
index 2eb51c8..0000000
--- a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package consisting the utility methods for metastore.
- */
-package org.apache.hadoop.hive.metastore.utils;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 1a5944d..8a826d2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -12667,7 +12667,7 @@ public class ObjectStore implements RawStore, Configurable {
try {
openTransaction();
Query q = pm.newQuery(MScheduledQuery.class,
- "nextExecution <= now && enabled && clusterNamespace == ns");
+ "nextExecution <= now && enabled && clusterNamespace == ns && activeExecution == null");
q.setSerializeRead(true);
q.declareParameters("java.lang.Integer now, java.lang.String ns");
q.setOrdering("nextExecution");
@@ -12685,6 +12685,7 @@ public class ObjectStore implements RawStore, Configurable {
execution.setState(QueryState.INITED);
execution.setStartTime(now);
execution.setLastUpdateTime(now);
+ schq.setActiveExecution(execution);
pm.makePersistent(execution);
pm.makePersistent(schq);
ObjectStoreTestHook.onScheduledQueryPoll();
@@ -12735,6 +12736,7 @@ public class ObjectStore implements RawStore, Configurable {
case TIMED_OUT:
execution.setEndTime((int) (System.currentTimeMillis() / 1000));
execution.setLastUpdateTime(null);
+ execution.getScheduledQuery().setActiveExecution(null);
break;
default:
throw new InvalidOperationException("invalid state: " + info.getState());
@@ -12967,6 +12969,8 @@ public class ObjectStore implements RawStore, Configurable {
// info.set
scheduledQueryProgress(info);
}
+
+ recoverInvalidScheduledQueryState(timeoutSecs);
committed = commitTransaction();
return results.size();
} finally {
@@ -12975,4 +12979,21 @@ public class ObjectStore implements RawStore, Configurable {
}
}
}
+
+ private void recoverInvalidScheduledQueryState(int timeoutSecs) {
+ int maxLastUpdateTime = (int) (System.currentTimeMillis() / 1000) - timeoutSecs;
+ Query q = pm.newQuery(MScheduledQuery.class);
+ q.setFilter("activeExecution != null");
+
+ List<MScheduledQuery> results = (List<MScheduledQuery>) q.execute();
+ for (MScheduledQuery e : results) {
+ Integer lastUpdateTime = e.getActiveExecution().getLastUpdateTime();
+ if (lastUpdateTime == null || lastUpdateTime < maxLastUpdateTime) {
+ LOG.error("Scheduled query: {} stuck with an activeExecution - clearing",
+ scheduledQueryKeyRef(e.getScheduleKey()));
+ e.setActiveExecution(null);
+ pm.makePersistent(e);
+ }
+ }
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java
index d678d01..4c1b34d 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ScheduledQueryExecutionsMaintTask.java
@@ -35,6 +35,13 @@ public class ScheduledQueryExecutionsMaintTask implements MetastoreTaskThread {
private Configuration conf;
@Override
+ public long initialDelay(TimeUnit unit) {
+ // no delay before the first execution;
+ // after an ungracefull shutdown it might take time to notice that in-flight scheduled queries are not running anymore
+ return 0;
+ }
+
+ @Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.SCHEDULED_QUERIES_EXECUTION_MAINT_TASK_FREQUENCY,
unit);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java
index d055e7d..c80241b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/model/MScheduledQuery.java
@@ -35,6 +35,7 @@ public class MScheduledQuery {
private String user;
private String query;
private Integer nextExecution;
+ private MScheduledExecution activeExecution;
private Set<MScheduledExecution> executions;
public MScheduledQuery(ScheduledQuery s) {
@@ -112,4 +113,12 @@ public class MScheduledQuery {
return user;
}
+ public void setActiveExecution(MScheduledExecution execution) {
+ activeExecution = execution;
+ }
+
+ public MScheduledExecution getActiveExecution() {
+ return activeExecution;
+ }
+
}
diff --git a/standalone-metastore/metastore-server/src/main/resources/package.jdo b/standalone-metastore/metastore-server/src/main/resources/package.jdo
index ce919e4..88eabfa 100644
--- a/standalone-metastore/metastore-server/src/main/resources/package.jdo
+++ b/standalone-metastore/metastore-server/src/main/resources/package.jdo
@@ -1471,6 +1471,10 @@
<field name="nextExecution">
<column name="NEXT_EXECUTION" jdbc-type="integer" allows-null="true"/>
</field>
+ <field name="activeExecution">
+ <column name="ACTIVE_EXECUTION_ID" allows-null="true"/>
+ <foreign-key name="SCHEDULED_EXECUTIONS_SCHQ_ACTIVE" delete-action="cascade"/>
+ </field>
<field name="executions" mapped-by="scheduledQuery" dependent-element="true">
<collection element-type="MScheduledExecution" dependent-element="true" />
<foreign-key name="SCHEDULED_EXECUTIONS_SCHQ_FK" delete-action="cascade"/>
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 29b20e4..48ad676 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -749,7 +749,8 @@ CREATE TABLE "APP"."SCHEDULED_QUERIES" (
"USER" varchar(128) not null,
"SCHEDULE" varchar(256) not null,
"QUERY" varchar(4000) not null,
- "NEXT_EXECUTION" integer not null
+ "NEXT_EXECUTION" integer not null,
+ "ACTIVE_EXECUTION_ID" bigint
);
CREATE INDEX NEXTEXECUTIONINDEX ON APP.SCHEDULED_QUERIES (ENABLED,CLUSTER_NAMESPACE,NEXT_EXECUTION);
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index e8fe6a2..7a230bd 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -58,6 +58,8 @@ ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY
-- HIVE-21487
CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-22872
+ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
\ No newline at end of file
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 955a94b..a2cf981 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1293,6 +1293,35 @@ ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG
INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+CREATE TABLE "SCHEDULED_QUERIES" (
+ "SCHEDULED_QUERY_ID" bigint NOT NULL,
+ "CLUSTER_NAMESPACE" VARCHAR(256),
+ "ENABLED" bit NOT NULL DEFAULT 0,
+ "NEXT_EXECUTION" INTEGER,
+ "QUERY" VARCHAR(4000),
+ "SCHEDULE" VARCHAR(256),
+ "SCHEDULE_NAME" VARCHAR(256),
+ "USER" VARCHAR(256),
+ "ACTIVE_EXECUTION_ID" bigint,
+ CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID")
+);
+
+CREATE TABLE "SCHEDULED_EXECUTIONS" (
+ "SCHEDULED_EXECUTION_ID" bigint NOT NULL,
+ "END_TIME" INTEGER,
+ "ERROR_MESSAGE" VARCHAR(2000),
+ "EXECUTOR_QUERY_ID" VARCHAR(256),
+ "LAST_UPDATE_TIME" INTEGER,
+ "SCHEDULED_QUERY_ID" bigint,
+ "START_TIME" INTEGER,
+ "STATE" VARCHAR(256),
+ CONSTRAINT SCHEDULED_EXECUTIONS_PK PRIMARY KEY ("SCHEDULED_EXECUTION_ID"),
+ CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY ("SCHEDULED_QUERY_ID") REFERENCES "SCHEDULED_QUERIES"("SCHEDULED_QUERY_ID") ON DELETE CASCADE
+);
+
+CREATE INDEX IDX_SCHEDULED_EX_LAST_UPDATE ON "SCHEDULED_EXECUTIONS" ("LAST_UPDATE_TIME");
+CREATE INDEX IDX_SCHEDULED_EX_SQ_ID ON "SCHEDULED_EXECUTIONS" ("SCHEDULED_QUERY_ID");
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index a554f8a..4a58770 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -33,6 +33,36 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (PARENT_TB
-- HIVE-21487
CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+CREATE TABLE "SCHEDULED_QUERIES" (
+ "SCHEDULED_QUERY_ID" bigint NOT NULL,
+ "CLUSTER_NAMESPACE" VARCHAR(256),
+ "ENABLED" bit NOT NULL DEFAULT 0,
+ "NEXT_EXECUTION" INTEGER,
+ "QUERY" VARCHAR(4000),
+ "SCHEDULE" VARCHAR(256),
+ "SCHEDULE_NAME" VARCHAR(256),
+ "USER" VARCHAR(256),
+ "ACTIVE_EXECUTION_ID" bigint,
+ CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID")
+);
+
+CREATE TABLE "SCHEDULED_EXECUTIONS" (
+ "SCHEDULED_EXECUTION_ID" bigint NOT NULL,
+ "END_TIME" INTEGER,
+ "ERROR_MESSAGE" VARCHAR(2000),
+ "EXECUTOR_QUERY_ID" VARCHAR(256),
+ "LAST_UPDATE_TIME" INTEGER,
+ "SCHEDULED_QUERY_ID" bigint,
+ "START_TIME" INTEGER,
+ "STATE" VARCHAR(256),
+ CONSTRAINT SCHEDULED_EXECUTIONS_PK PRIMARY KEY ("SCHEDULED_EXECUTION_ID"),
+ CONSTRAINT SCHEDULED_EXECUTIONS_SCHQ_FK FOREIGN KEY ("SCHEDULED_QUERY_ID") REFERENCES "SCHEDULED_QUERIES"("SCHEDULED_QUERY_ID") ON DELETE CASCADE
+);
+
+CREATE INDEX IDX_SCHEDULED_EX_LAST_UPDATE ON "SCHEDULED_EXECUTIONS" ("LAST_UPDATE_TIME");
+CREATE INDEX IDX_SCHEDULED_EX_SQ_ID ON "SCHEDULED_EXECUTIONS" ("SCHEDULED_QUERY_ID");
+
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 63c97e6..bc34b51 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1224,6 +1224,7 @@ CREATE TABLE SCHEDULED_QUERIES (
SCHEDULE VARCHAR(256),
SCHEDULE_NAME VARCHAR(256),
`USER` VARCHAR(256),
+ ACTIVE_EXECUTION_ID INTEGER,
CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY (SCHEDULED_QUERY_ID)
);
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index c175d4c..13f03bc 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -62,6 +62,8 @@ ALTER TABLE `KEY_CONSTRAINTS` ADD CONSTRAINT `CONSTRAINTS_PK` PRIMARY KEY (`PARE
-- HIVE-21487
CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-22872
+ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ;
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index 4338d9c..8482b59 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1199,6 +1199,7 @@ CREATE TABLE "SCHEDULED_QUERIES" (
"SCHEDULE" VARCHAR(256),
"SCHEDULE_NAME" VARCHAR(256),
"USER" VARCHAR(256),
+ "ACTIVE_EXECUTION_ID" number(19),
CONSTRAINT SCHEDULED_QUERIES_PK PRIMARY KEY ("SCHEDULED_QUERY_ID")
);
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index cec7f53..cbfdd86 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -62,6 +62,8 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (PARENT_TB
-- HIVE-21487
CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
+-- HIVE-22872
+ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19);
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index e78fff1..aa35a7a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1887,6 +1887,7 @@ CREATE TABLE "SCHEDULED_QUERIES" (
"SCHEDULE" VARCHAR(256),
"SCHEDULE_NAME" VARCHAR(256),
"USER" VARCHAR(256),
+ "ACTIVE_EXECUTION_ID" BIGINT,
CONSTRAINT "SCHEDULED_QUERIES_PK" PRIMARY KEY ("SCHEDULED_QUERY_ID")
);
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 52953f0..9462328 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -193,6 +193,8 @@ ALTER TABLE "KEY_CONSTRAINTS" ADD CONSTRAINT "CONSTRAINTS_PK" PRIMARY KEY ("PARE
-- HIVE-21487
CREATE INDEX "COMPLETED_COMPACTIONS_RES" ON "COMPLETED_COMPACTIONS" ("CC_DATABASE","CC_TABLE","CC_PARTITION");
+-- HIVE-22872
+ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint;
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;