You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/02/17 12:19:00 UTC
[hive] 03/03: HIVE-22873: Make it possible to identify which hs2
instance executed a scheduled query (Zoltan Haindrich reviewed by Miklos
Gergely)
This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
commit e97ff5b3df4258fa83a15507000d7e42c2aac8f4
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Mon Feb 17 12:07:18 2020 +0000
HIVE-22873: Make it possible to identify which hs2 instance executed a scheduled query (Zoltan Haindrich reviewed by Miklos Gergely)
Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
.../scheduled/ScheduledQueryExecutionContext.java | 11 ++++++++
.../scheduled/ScheduledQueryExecutionService.java | 25 ++++++++++-------
.../hive/ql/schq/TestScheduledQueryService.java | 31 +++++++++++++---------
3 files changed, 45 insertions(+), 22 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
index 9decb8c..1bb24ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.scheduled;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@ public class ScheduledQueryExecutionContext {
public final ExecutorService executor;
public final IScheduledQueryMaintenanceService schedulerService;
public final HiveConf conf;
+ public final String executorHostName;
public ScheduledQueryExecutionContext(
ExecutorService executor,
@@ -41,6 +44,14 @@ public class ScheduledQueryExecutionContext {
this.executor = executor;
this.conf = conf;
this.schedulerService = service;
+ try {
+ this.executorHostName = InetAddress.getLocalHost().getHostName();
+ if (executorHostName == null) {
+ throw new RuntimeException("Hostname is null; Can't function without a valid hostname!");
+ }
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("Can't function without a valid hostname!", e);
+ }
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
index 06cfe3f..9a6237c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
@@ -51,23 +51,27 @@ public class ScheduledQueryExecutionService implements Closeable {
private ScheduledQueryExecutor worker;
private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger();
- public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf conf0) {
+ public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) {
+ HiveConf conf = new HiveConf(inputConf);
+ MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
+ ExecutorService executor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build());
+ ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
+ return startScheduledQueryExecutorService(ctx);
+ }
+
+ public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) {
synchronized (ScheduledQueryExecutionService.class) {
if (INSTANCE != null) {
throw new IllegalStateException(
"There is already a ScheduledQueryExecutionService in service; check it and close it explicitly if neccessary");
}
- HiveConf conf = new HiveConf(conf0);
- MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
- ExecutorService executor = Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build());
- ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
INSTANCE = new ScheduledQueryExecutionService(ctx);
return INSTANCE;
}
}
- public ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
+ private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
context = ctx;
ctx.executor.submit(worker = new ScheduledQueryExecutor());
ctx.executor.submit(new ProgressReporter());
@@ -138,7 +142,7 @@ public class ScheduledQueryExecutionService implements Closeable {
reportQueryProgress();
try (
IDriver driver = DriverFactory.newDriver(DriverFactory.getNewQueryState(conf), null)) {
- info.setExecutorQueryId(driver.getQueryState().getQueryId());
+ info.setExecutorQueryId(buildExecutorQueryId(driver));
reportQueryProgress();
driver.run(q.getQuery());
info.setState(QueryState.FINISHED);
@@ -153,11 +157,14 @@ public class ScheduledQueryExecutionService implements Closeable {
} catch (Throwable e) {
}
}
-
reportQueryProgress();
}
}
+ private String buildExecutorQueryId(IDriver driver) {
+ return String.format("%s/%s", context.executorHostName, driver.getQueryState().getQueryId());
+ }
+
private String lockNameFor(ScheduledQueryKey scheduleKey) {
return String.format("scheduled_query_%s_%s", scheduleKey.getClusterNamespace(), scheduleKey.getScheduleName());
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
index 9a7b423..a8fe0c3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
@@ -17,15 +17,13 @@
*/
package org.apache.hadoop.hive.ql.schq;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.QueryState;
import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
@@ -100,17 +98,19 @@ public class TestScheduledQueryService {
return res.size();
}
- // Use notify/wait on this object to indicate when the scheduled query has finished executing.
- static Object notifier = new Object();
public static class MockScheduledQueryService implements IScheduledQueryMaintenanceService {
+ // Use notify/wait on this object to indicate when the scheduled query has finished executing.
+ Object notifier = new Object();
+
int id = 0;
private String stmt;
+ ScheduledQueryProgressInfo lastProgressInfo;
public MockScheduledQueryService(String string) {
stmt = string;
}
-
+
@Override
public ScheduledQueryPollResponse scheduledQueryPoll() {
@@ -129,6 +129,7 @@ public class TestScheduledQueryService {
public void scheduledQueryProgress(ScheduledQueryProgressInfo info) {
System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(),
info.getErrorMessage());
+ lastProgressInfo = info;
if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) {
// Query is done, notify any waiters
synchronized (notifier) {
@@ -152,17 +153,21 @@ public class TestScheduledQueryService {
HiveConf conf = env_setup.getTestCtx().hiveConf;
MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)");
ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
- ScheduledQueryExecutionService sQ = new ScheduledQueryExecutionService(ctx);
+ try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) {
- executor.shutdown();
- // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
- SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
- synchronized (notifier) {
- notifier.wait(30000);
+ executor.shutdown();
+ // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
+ SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
+ synchronized (qService.notifier) {
+ qService.notifier.wait(30000);
+ }
+ SessionState.getConsole().logInfo("Done waiting for query execution!");
}
- SessionState.getConsole().logInfo("Done waiting for query execution!");
executor.shutdownNow();
+ assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true));
+ assertThat(qService.lastProgressInfo.getExecutorQueryId(),
+ Matchers.containsString(ctx.executorHostName + "/"));
int nr = getNumRowsReturned(driver, "select 1 from tu");
assertThat(nr, Matchers.equalTo(5));