You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/03 07:31:22 UTC

[hive] branch master updated: HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c397f  HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita)
b1c397f is described below

commit b1c397f769de348d38f4b8eab217cab8eaefa5e1
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Wed Jul 3 09:30:30 2019 +0200

    HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  5 ++
 .../llap/tezplugins/LlapTaskSchedulerService.java  |  2 +-
 .../tezplugins/metrics/LlapMetricsCollector.java   | 57 +++++++++++++--
 .../tezplugins/metrics/LlapMetricsListener.java    | 47 ++++++++++++
 .../metrics/TestLlapMetricsCollector.java          | 84 ++++++++++++++++++++++
 5 files changed, 187 insertions(+), 8 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 48b49ce..375ceea 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4353,6 +4353,11 @@ public class HiveConf extends Configuration {
       new TimeValidator(TimeUnit.MILLISECONDS), "Collect llap daemon metrics in the AM every given milliseconds,\n" +
       "so that the AM can use this information, to make better scheduling decisions.\n" +
       "If it's set to 0, then the feature is disabled."),
+    LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER(
+      "hive.llap.task.scheduler.am.collect.daemon.metrics.listener", "",
+      "The listener which is called when new Llap Daemon statistics is received on AM side.\n" +
+      "The listener should implement the " +
+      "org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."),
     LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap",
       "AM registry name for LLAP task scheduler plugin to register with."),
     LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "",
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 37e2fcd..a97a934 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -404,7 +404,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     if (HiveConf.getTimeVar(conf,
             HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS) > 0) {
-      this.llapMetricsCollector = new LlapMetricsCollector(conf);
+      this.llapMetricsCollector = new LlapMetricsCollector(conf, registry);
       this.registry.registerServiceListener(llapMetricsCollector);
     }
 
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
index 99a2521..2ca7ed6 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
@@ -1,7 +1,11 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -30,6 +34,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.logging.log4j.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,26 +63,50 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
   private final Map<String, LlapManagementProtocolClientImpl> llapClients;
   private final Map<String, LlapMetrics> instanceStatisticsMap;
   private final long metricsCollectionMs;
+  @VisibleForTesting
+  final LlapMetricsListener listener;
 
 
-  public LlapMetricsCollector(Configuration conf) {
+  public LlapMetricsCollector(Configuration conf, LlapRegistryService registry) {
     this(
             conf,
             Executors.newSingleThreadScheduledExecutor(
                     new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_NAME)
                             .build()),
-            LlapManagementProtocolClientImplFactory.basicInstance(conf));
+            LlapManagementProtocolClientImplFactory.basicInstance(conf),
+            registry);
   }
 
   @VisibleForTesting
   LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor,
                        LlapManagementProtocolClientImplFactory clientFactory) {
+    this(conf, scheduledMetricsExecutor, clientFactory, null);
+  }
+
+  @VisibleForTesting
+  LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor,
+                       LlapManagementProtocolClientImplFactory clientFactory,
+                       LlapRegistryService registry) {
     this.scheduledMetricsExecutor = scheduledMetricsExecutor;
     this.clientFactory = clientFactory;
     this.llapClients = new HashMap<>();
     this.instanceStatisticsMap = new ConcurrentHashMap<>();
     this.metricsCollectionMs = HiveConf.getTimeVar(conf,
             HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS);
+    String listenerClass = HiveConf.getVar(conf,
+        HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER);
+    if (Strings.isBlank(listenerClass)) {
+      listener = null;
+    } else {
+      try {
+        listener = (LlapMetricsListener)Class.forName(listenerClass.trim()).newInstance();
+        listener.init(conf, registry);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Wrong configuration for "
+            + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER
+            + " " + listenerClass, e);
+      }
+    }
   }
 
   public void start() {
@@ -101,13 +130,27 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
         LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics =
                 client.getDaemonMetrics(null,
                         LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.newBuilder().build());
-        instanceStatisticsMap.put(identity, new LlapMetrics(metrics));
-
+        LlapMetrics newMetrics = new LlapMetrics(metrics);
+        instanceStatisticsMap.put(identity, newMetrics);
+        if (listener != null) {
+          try {
+            listener.newDaemonMetrics(identity, newMetrics);
+          } catch (Throwable t) {
+            LOG.warn("LlapMetricsListener thrown an unexpected exception", t);
+          }
+        }
       } catch (ServiceException ex) {
         LOG.error(ex.getMessage(), ex);
         instanceStatisticsMap.remove(identity);
       }
     }
+    if (listener != null) {
+      try {
+        listener.newClusterMetrics(getMetrics());
+      } catch (Throwable t) {
+        LOG.warn("LlapMetricsListener thrown an unexpected exception", t);
+      }
+    }
   }
 
   @Override
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java
new file mode 100644
index 0000000..446100b
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics;
+
+import java.util.Map;
+
+/**
+ * Interface to handle Llap Daemon metrics changes.
+ */
+public interface LlapMetricsListener {
+
+  /**
+   * Initializing the listener with the current configuration.
+   * @param conf The configuration
+   * @param registry The Llap registry service to access the Llap Daemons
+   */
+  void init(Configuration conf, LlapRegistryService registry);
+
+  /**
+   * Handler will be called when new Llap Daemon metrics data is arrived.
+   * @param workerIdentity The worker identity of the Llap Daemon
+   * @param newMetrics The new metrics object
+   */
+  void newDaemonMetrics(String workerIdentity, LlapMetrics newMetrics);
+
+  /**
+   * Handler will be called when new data is arrived for every active Llap Daemon in the cluster.
+   * @param newMetrics The map of the worker indentity -> metrics
+   */
+  void newClusterMetrics(Map<String, LlapMetrics> newMetrics);
+}
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
index 6da4d8c..d9e1f71 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
@@ -29,11 +29,15 @@ import org.junit.Test;
 import org.mockito.Mock;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
@@ -73,6 +77,9 @@ public class TestLlapMetricsCollector {
 
     when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.varname,
             HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.defaultStrVal)).thenReturn("30000ms");
+    when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.varname,
+        HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.defaultStrVal))
+          .thenReturn(MockListener.class.getName());
     when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient);
     when(mockClient.getDaemonMetrics(
             any(RpcController.class),
@@ -220,4 +227,81 @@ public class TestLlapMetricsCollector {
             .scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
 
   }
+
+  /**
+   * Check that the listener is created and called. The default config contains the mock listener.
+   */
+  @Test(timeout = DEFAULT_TIMEOUT)
+  public void testListener() {
+    // Given
+    LlapServiceInstance mockService1 = mock(LlapServiceInstance.class);
+    when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1);
+    LlapServiceInstance mockService2 = mock(LlapServiceInstance.class);
+    when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2);
+
+    // When
+    collector.onCreate(mockService1, TEST_SEQ_VERSION);
+    collector.onCreate(mockService2, TEST_SEQ_VERSION);
+    collector.collectMetrics();
+    collector.collectMetrics();
+    collector.collectMetrics();
+
+    // Then
+    assertNotNull(collector.listener);
+    assertEquals(1, ((MockListener)collector.listener).initCount);
+    assertEquals(3, ((MockListener)collector.listener).fullMetricsCount);
+    assertEquals(6, ((MockListener)collector.listener).daemonMetricsCount);
+  }
+
+  /**
+   * Check that the collector is working without the listener too.
+   */
+  @Test(timeout = DEFAULT_TIMEOUT)
+  public void testWithoutListener() {
+    // Given
+    when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.varname,
+        HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.defaultStrVal)).thenReturn("");
+    collector = new LlapMetricsCollector(mockConf, mockExecutor, mockClientFactory);
+
+    LlapServiceInstance mockService1 = mock(LlapServiceInstance.class);
+    when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1);
+    LlapServiceInstance mockService2 = mock(LlapServiceInstance.class);
+    when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2);
+
+    // Check that there is no exception with start / create / remove / collect
+    collector.start();
+    collector.onCreate(mockService1, TEST_SEQ_VERSION);
+    collector.onCreate(mockService2, TEST_SEQ_VERSION);
+    collector.onRemove(mockService2, TEST_SEQ_VERSION);
+    collector.collectMetrics();
+
+    // Then
+    assertNull(collector.listener);
+  }
+
+  /**
+   * Just count the calls.
+   */
+  static class MockListener implements LlapMetricsListener {
+    int initCount = 0;
+    int daemonMetricsCount = 0;
+    int fullMetricsCount = 0;
+
+    @Override
+    public void init(Configuration configuration, LlapRegistryService registry) {
+      initCount++;
+    }
+
+    @Override
+    public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) {
+      assertTrue("Init should be called first", initCount > 0);
+      daemonMetricsCount++;
+    }
+
+    @Override
+    public void newClusterMetrics(Map<String, LlapMetricsCollector.LlapMetrics> newMetrics) {
+      assertTrue("Init should be called first", initCount > 0);
+      fullMetricsCount++;
+    }
+  }
 }