You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/03 07:31:22 UTC
[hive] branch master updated: HIVE-21911: Pluggable
LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary
reviewed by Oliver Draese and Adam Szita)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b1c397f HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita)
b1c397f is described below
commit b1c397f769de348d38f4b8eab217cab8eaefa5e1
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Wed Jul 3 09:30:30 2019 +0200
HIVE-21911: Pluggable LlapMetricsListener on Tez side to disable / resize Daemons (Peter Vary reviewed by Oliver Draese and Adam Szita)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 ++
.../llap/tezplugins/LlapTaskSchedulerService.java | 2 +-
.../tezplugins/metrics/LlapMetricsCollector.java | 57 +++++++++++++--
.../tezplugins/metrics/LlapMetricsListener.java | 47 ++++++++++++
.../metrics/TestLlapMetricsCollector.java | 84 ++++++++++++++++++++++
5 files changed, 187 insertions(+), 8 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 48b49ce..375ceea 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4353,6 +4353,11 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS), "Collect llap daemon metrics in the AM every given milliseconds,\n" +
"so that the AM can use this information, to make better scheduling decisions.\n" +
"If it's set to 0, then the feature is disabled."),
+ LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER(
+ "hive.llap.task.scheduler.am.collect.daemon.metrics.listener", "",
+ "The listener which is called when new Llap Daemon statistics is received on AM side.\n" +
+ "The listener should implement the " +
+ "org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."),
LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap",
"AM registry name for LLAP task scheduler plugin to register with."),
LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "",
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 37e2fcd..a97a934 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -404,7 +404,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (HiveConf.getTimeVar(conf,
HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS) > 0) {
- this.llapMetricsCollector = new LlapMetricsCollector(conf);
+ this.llapMetricsCollector = new LlapMetricsCollector(conf, registry);
this.registry.registerServiceListener(llapMetricsCollector);
}
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
index 99a2521..2ca7ed6 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
@@ -1,7 +1,11 @@
/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -30,6 +34,7 @@ import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,26 +63,50 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
private final Map<String, LlapManagementProtocolClientImpl> llapClients;
private final Map<String, LlapMetrics> instanceStatisticsMap;
private final long metricsCollectionMs;
+ @VisibleForTesting
+ final LlapMetricsListener listener;
- public LlapMetricsCollector(Configuration conf) {
+ public LlapMetricsCollector(Configuration conf, LlapRegistryService registry) {
this(
conf,
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_NAME)
.build()),
- LlapManagementProtocolClientImplFactory.basicInstance(conf));
+ LlapManagementProtocolClientImplFactory.basicInstance(conf),
+ registry);
}
@VisibleForTesting
LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor,
LlapManagementProtocolClientImplFactory clientFactory) {
+ this(conf, scheduledMetricsExecutor, clientFactory, null);
+ }
+
+ @VisibleForTesting
+ LlapMetricsCollector(Configuration conf, ScheduledExecutorService scheduledMetricsExecutor,
+ LlapManagementProtocolClientImplFactory clientFactory,
+ LlapRegistryService registry) {
this.scheduledMetricsExecutor = scheduledMetricsExecutor;
this.clientFactory = clientFactory;
this.llapClients = new HashMap<>();
this.instanceStatisticsMap = new ConcurrentHashMap<>();
this.metricsCollectionMs = HiveConf.getTimeVar(conf,
HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS);
+ String listenerClass = HiveConf.getVar(conf,
+ HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER);
+ if (Strings.isBlank(listenerClass)) {
+ listener = null;
+ } else {
+ try {
+ listener = (LlapMetricsListener)Class.forName(listenerClass.trim()).newInstance();
+ listener.init(conf, registry);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Wrong configuration for "
+ + HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER
+ + " " + listenerClass, e);
+ }
+ }
}
public void start() {
@@ -101,13 +130,27 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics =
client.getDaemonMetrics(null,
LlapDaemonProtocolProtos.GetDaemonMetricsRequestProto.newBuilder().build());
- instanceStatisticsMap.put(identity, new LlapMetrics(metrics));
-
+ LlapMetrics newMetrics = new LlapMetrics(metrics);
+ instanceStatisticsMap.put(identity, newMetrics);
+ if (listener != null) {
+ try {
+ listener.newDaemonMetrics(identity, newMetrics);
+ } catch (Throwable t) {
+ LOG.warn("LlapMetricsListener thrown an unexpected exception", t);
+ }
+ }
} catch (ServiceException ex) {
LOG.error(ex.getMessage(), ex);
instanceStatisticsMap.remove(identity);
}
}
+ if (listener != null) {
+ try {
+ listener.newClusterMetrics(getMetrics());
+ } catch (Throwable t) {
+ LOG.warn("LlapMetricsListener thrown an unexpected exception", t);
+ }
+ }
}
@Override
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java
new file mode 100644
index 0000000..446100b
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics;
+
+import java.util.Map;
+
+/**
+ * Interface to handle Llap Daemon metrics changes.
+ */
+public interface LlapMetricsListener {
+
+ /**
+ * Initializing the listener with the current configuration.
+ * @param conf The configuration
+ * @param registry The Llap registry service to access the Llap Daemons
+ */
+ void init(Configuration conf, LlapRegistryService registry);
+
+ /**
+ * Handler will be called when new Llap Daemon metrics data is arrived.
+ * @param workerIdentity The worker identity of the Llap Daemon
+ * @param newMetrics The new metrics object
+ */
+ void newDaemonMetrics(String workerIdentity, LlapMetrics newMetrics);
+
+ /**
+ * Handler will be called when new data is arrived for every active Llap Daemon in the cluster.
+ * @param newMetrics The map of the worker indentity -> metrics
+ */
+ void newClusterMetrics(Map<String, LlapMetrics> newMetrics);
+}
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
index 6da4d8c..d9e1f71 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
@@ -29,11 +29,15 @@ import org.junit.Test;
import org.mockito.Mock;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -73,6 +77,9 @@ public class TestLlapMetricsCollector {
when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.varname,
HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS.defaultStrVal)).thenReturn("30000ms");
+ when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.varname,
+ HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.defaultStrVal))
+ .thenReturn(MockListener.class.getName());
when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient);
when(mockClient.getDaemonMetrics(
any(RpcController.class),
@@ -220,4 +227,81 @@ public class TestLlapMetricsCollector {
.scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}
+
+ /**
+ * Check that the listener is created and called. The default config contains the mock listener.
+ */
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testListener() {
+ // Given
+ LlapServiceInstance mockService1 = mock(LlapServiceInstance.class);
+ when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1);
+ LlapServiceInstance mockService2 = mock(LlapServiceInstance.class);
+ when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2);
+
+ // When
+ collector.onCreate(mockService1, TEST_SEQ_VERSION);
+ collector.onCreate(mockService2, TEST_SEQ_VERSION);
+ collector.collectMetrics();
+ collector.collectMetrics();
+ collector.collectMetrics();
+
+ // Then
+ assertNotNull(collector.listener);
+ assertEquals(1, ((MockListener)collector.listener).initCount);
+ assertEquals(3, ((MockListener)collector.listener).fullMetricsCount);
+ assertEquals(6, ((MockListener)collector.listener).daemonMetricsCount);
+ }
+
+ /**
+ * Check that the collector is working without the listener too.
+ */
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testWithoutListener() {
+ // Given
+ when(mockConf.get(HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.varname,
+ HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER.defaultStrVal)).thenReturn("");
+ collector = new LlapMetricsCollector(mockConf, mockExecutor, mockClientFactory);
+
+ LlapServiceInstance mockService1 = mock(LlapServiceInstance.class);
+ when(mockService1.getWorkerIdentity()).thenReturn(TEST_IDENTITY_1);
+ LlapServiceInstance mockService2 = mock(LlapServiceInstance.class);
+ when(mockService2.getWorkerIdentity()).thenReturn(TEST_IDENTITY_2);
+
+ // Check that there is no exception with start / create / remove / collect
+ collector.start();
+ collector.onCreate(mockService1, TEST_SEQ_VERSION);
+ collector.onCreate(mockService2, TEST_SEQ_VERSION);
+ collector.onRemove(mockService2, TEST_SEQ_VERSION);
+ collector.collectMetrics();
+
+ // Then
+ assertNull(collector.listener);
+ }
+
+ /**
+ * Just count the calls.
+ */
+ static class MockListener implements LlapMetricsListener {
+ int initCount = 0;
+ int daemonMetricsCount = 0;
+ int fullMetricsCount = 0;
+
+ @Override
+ public void init(Configuration configuration, LlapRegistryService registry) {
+ initCount++;
+ }
+
+ @Override
+ public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) {
+ assertTrue("Init should be called first", initCount > 0);
+ daemonMetricsCount++;
+ }
+
+ @Override
+ public void newClusterMetrics(Map<String, LlapMetricsCollector.LlapMetrics> newMetrics) {
+ assertTrue("Init should be called first", initCount > 0);
+ fullMetricsCount++;
+ }
+ }
}