You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kg...@apache.org on 2020/02/17 12:19:00 UTC

[hive] 03/03: HIVE-22873: Make it possible to identify which hs2 instance executed a scheduled query (Zoltan Haindrich reviewed by Miklos Gergely)

This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git

commit e97ff5b3df4258fa83a15507000d7e42c2aac8f4
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Mon Feb 17 12:07:18 2020 +0000

    HIVE-22873: Make it possible to identify which hs2 instance executed a scheduled query (Zoltan Haindrich reviewed by Miklos Gergely)
    
    Signed-off-by: Zoltan Haindrich <ki...@rxd.hu>
---
 .../scheduled/ScheduledQueryExecutionContext.java  | 11 ++++++++
 .../scheduled/ScheduledQueryExecutionService.java  | 25 ++++++++++-------
 .../hive/ql/schq/TestScheduledQueryService.java    | 31 +++++++++++++---------
 3 files changed, 45 insertions(+), 22 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
index 9decb8c..1bb24ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionContext.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.scheduled;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -33,6 +35,7 @@ public class ScheduledQueryExecutionContext {
   public final ExecutorService executor;
   public final IScheduledQueryMaintenanceService schedulerService;
   public final HiveConf conf;
+  public final String executorHostName;
 
   public ScheduledQueryExecutionContext(
       ExecutorService executor,
@@ -41,6 +44,14 @@ public class ScheduledQueryExecutionContext {
     this.executor = executor;
     this.conf = conf;
     this.schedulerService = service;
+    try {
+      this.executorHostName = InetAddress.getLocalHost().getHostName();
+      if (executorHostName == null) {
+        throw new RuntimeException("Hostname is null; Can't function without a valid hostname!");
+      }
+    } catch (UnknownHostException e) {
+      throw new RuntimeException("Can't function without a valid hostname!", e);
+    }
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
index 06cfe3f..9a6237c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/scheduled/ScheduledQueryExecutionService.java
@@ -51,23 +51,27 @@ public class ScheduledQueryExecutionService implements Closeable {
   private ScheduledQueryExecutor worker;
   private AtomicInteger forcedScheduleCheckCounter = new AtomicInteger();
 
-  public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf conf0) {
+  public static ScheduledQueryExecutionService startScheduledQueryExecutorService(HiveConf inputConf) {
+    HiveConf conf = new HiveConf(inputConf);
+    MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
+    ExecutorService executor = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build());
+    ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
+    return startScheduledQueryExecutorService(ctx);
+  }
+
+  public static ScheduledQueryExecutionService startScheduledQueryExecutorService(ScheduledQueryExecutionContext ctx) {
     synchronized (ScheduledQueryExecutionService.class) {
       if (INSTANCE != null) {
         throw new IllegalStateException(
             "There is already a ScheduledQueryExecutionService in service; check it and close it explicitly if neccessary");
       }
-      HiveConf conf = new HiveConf(conf0);
-      MetastoreBasedScheduledQueryService qService = new MetastoreBasedScheduledQueryService(conf);
-      ExecutorService executor = Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Scheduled Query Thread %d").build());
-      ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
       INSTANCE = new ScheduledQueryExecutionService(ctx);
       return INSTANCE;
     }
   }
 
-  public ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
+  private ScheduledQueryExecutionService(ScheduledQueryExecutionContext ctx) {
     context = ctx;
     ctx.executor.submit(worker = new ScheduledQueryExecutor());
     ctx.executor.submit(new ProgressReporter());
@@ -138,7 +142,7 @@ public class ScheduledQueryExecutionService implements Closeable {
         reportQueryProgress();
         try (
           IDriver driver = DriverFactory.newDriver(DriverFactory.getNewQueryState(conf), null)) {
-          info.setExecutorQueryId(driver.getQueryState().getQueryId());
+          info.setExecutorQueryId(buildExecutorQueryId(driver));
           reportQueryProgress();
           driver.run(q.getQuery());
           info.setState(QueryState.FINISHED);
@@ -153,11 +157,14 @@ public class ScheduledQueryExecutionService implements Closeable {
           } catch (Throwable e) {
           }
         }
-
         reportQueryProgress();
       }
     }
 
+    private String buildExecutorQueryId(IDriver driver) {
+      return String.format("%s/%s", context.executorHostName, driver.getQueryState().getQueryId());
+    }
+
     private String lockNameFor(ScheduledQueryKey scheduleKey) {
       return String.format("scheduled_query_%s_%s", scheduleKey.getClusterNamespace(), scheduleKey.getScheduleName());
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
index 9a7b423..a8fe0c3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/schq/TestScheduledQueryService.java
@@ -17,15 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.schq;
 
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.QueryState;
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
@@ -100,17 +98,19 @@ public class TestScheduledQueryService {
     return res.size();
   }
 
-  // Use notify/wait on this object to indicate when the scheduled query has finished executing.
-  static Object notifier = new Object();
 
   public static class MockScheduledQueryService implements IScheduledQueryMaintenanceService {
+    // Use notify/wait on this object to indicate when the scheduled query has finished executing.
+    Object notifier = new Object();
+
     int id = 0;
     private String stmt;
+    ScheduledQueryProgressInfo lastProgressInfo;
 
     public MockScheduledQueryService(String string) {
       stmt = string;
     }
-    
+
     @Override
     public ScheduledQueryPollResponse scheduledQueryPoll() {
 
@@ -129,6 +129,7 @@ public class TestScheduledQueryService {
     public void scheduledQueryProgress(ScheduledQueryProgressInfo info) {
       System.out.printf("%d, state: %s, error: %s", info.getScheduledExecutionId(), info.getState(),
           info.getErrorMessage());
+      lastProgressInfo = info;
       if (info.getState() == QueryState.FINISHED || info.getState() == QueryState.FAILED) {
         // Query is done, notify any waiters
         synchronized (notifier) {
@@ -152,17 +153,21 @@ public class TestScheduledQueryService {
     HiveConf conf = env_setup.getTestCtx().hiveConf;
     MockScheduledQueryService qService = new MockScheduledQueryService("insert into tu values(1),(2),(3),(4),(5)");
     ScheduledQueryExecutionContext ctx = new ScheduledQueryExecutionContext(executor, conf, qService);
-    ScheduledQueryExecutionService sQ = new ScheduledQueryExecutionService(ctx);
+    try (ScheduledQueryExecutionService sQ = ScheduledQueryExecutionService.startScheduledQueryExecutorService(ctx)) {
 
-    executor.shutdown();
-    // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
-    SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
-    synchronized (notifier) {
-      notifier.wait(30000);
+      executor.shutdown();
+      // Wait for the scheduled query to finish. Hopefully 30 seconds should be more than enough.
+      SessionState.getConsole().logInfo("Waiting for query execution to finish ...");
+      synchronized (qService.notifier) {
+        qService.notifier.wait(30000);
+      }
+      SessionState.getConsole().logInfo("Done waiting for query execution!");
     }
-    SessionState.getConsole().logInfo("Done waiting for query execution!");
     executor.shutdownNow();
 
+    assertThat(qService.lastProgressInfo.isSetExecutorQueryId(), is(true));
+    assertThat(qService.lastProgressInfo.getExecutorQueryId(),
+        Matchers.containsString(ctx.executorHostName + "/"));
     int nr = getNumRowsReturned(driver, "select 1 from tu");
     assertThat(nr, Matchers.equalTo(5));