You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/18 09:41:09 UTC
[hive] branch master updated: HIVE-21912: Implement
BlacklistingLlapMetricsListener (Peter Vary reviewed by Oliver Draese and
Adam Szita)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 374f361 HIVE-21912: Implement BlacklistingLlapMetricsListener (Peter Vary reviewed by Oliver Draese and Adam Szita)
374f361 is described below
commit 374f361264780a322bd4fc6e40a29606140d517a
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Thu Jul 18 11:40:12 2019 +0200
HIVE-21912: Implement BlacklistingLlapMetricsListener (Peter Vary reviewed by Oliver Draese and Adam Szita)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 26 +++
.../llap/registry/impl/LlapRegistryService.java | 44 ++++-
.../registry/impl/LlapZookeeperRegistryImpl.java | 99 +++++++++-
.../hive/llap/metrics/LlapDaemonExecutorInfo.java | 0
.../registry/impl/TestLlapRegistryService.java | 116 ++++++++++++
.../hive/llap/registry/impl/package-info.java | 23 +++
.../metrics/BlacklistingLlapMetricsListener.java | 209 +++++++++++++++++++++
.../LlapManagementProtocolClientImplFactory.java | 58 ++++++
.../tezplugins/metrics/LlapMetricsCollector.java | 55 ++----
.../TestBlacklistingLlapMetricsListener.java | 184 ++++++++++++++++++
.../metrics/TestLlapMetricsCollector.java | 2 +-
11 files changed, 758 insertions(+), 58 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e13785..0f34986 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4358,6 +4358,32 @@ public class HiveConf extends Configuration {
"The listener which is called when new Llap Daemon statistics is received on AM side.\n" +
"The listener should implement the " +
"org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."),
+ LLAP_NODEHEALTHCHECKS_MINTASKS(
+ "hive.llap.nodehealthchecks.mintasks", 2000,
+ "Specifies the minimum amount of tasks, executed by a particular LLAP daemon, before the health\n" +
+ "status of the node is examined."),
+ LLAP_NODEHEALTHCHECKS_MININTERVALDURATION(
+ "hive.llap.nodehealthckecks.minintervalduration", "300s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "The minimum time that needs to elapse between two actions that are the correcting results of identifying\n" +
+ "an unhealthy node. Even if additional nodes are considered to be unhealthy, no action is performed until\n" +
+ "this time interval has passed since the last corrective action."),
+ LLAP_NODEHEALTHCHECKS_TASKTIMERATIO(
+ "hive.llap.nodehealthckecks.tasktimeratio", 1.5f,
+ "LLAP daemons are considered unhealthy, if their average (Map-) task execution time is significantly larger\n" +
+ "than the average task execution time of other nodes. This value specifies the ratio of a node to other\n" +
+ "nodes, which is considered as threshold for unhealthy. A value of 1.5 for example considers a node to be\n" +
+ "unhealthy if its average task execution time is 50% larger than the average of other nodes."),
+ LLAP_NODEHEALTHCHECKS_EXECUTORRATIO(
+ "hive.llap.nodehealthckecks.executorratio", 2.0f,
+ "If an unhealthy node is identified, it is blacklisted only where there is enough free executors to execute\n" +
+ "the tasks. This value specifies the ratio of the free executors compared to the blacklisted ones.\n" +
+ "A value of 2.0 for example defines that we blacklist an unhealthy node only if we have 2 times more\n" +
+ "free executors on the remaining nodes than the unhealthy node."),
+ LLAP_NODEHEALTHCHECKS_MAXNODES(
+ "hive.llap.nodehealthckecks.maxnodes", 1,
+ "The maximum number of blacklisted nodes. If there are at least this number of blacklisted nodes\n" +
+ "the listener will not blacklist further nodes even if all the conditions are met."),
LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap",
"AM registry name for LLAP task scheduler plugin to register with."),
LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "",
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index ea824a1..2d05bda 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -1,16 +1,21 @@
/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package org.apache.hadoop.hive.llap.registry.impl;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
@@ -19,11 +24,13 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.service.AbstractService;
@@ -142,6 +149,25 @@ public class LlapRegistryService extends AbstractService {
}
}
+ /**
+ * Locks the Llap Cluster for configuration change for the given time window.
+ * @param windowStart The beginning of the time window when no other configuration change is allowed.
+ * @param windowEnd The end of the time window when no other configuration change is allowed.
+ * @return The result of the change (success if the lock is succeeded, and the next possible
+ * configuration change time
+ */
+ public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) {
+ if (this.registry == null) {
+ throw new IllegalStateException("Not allowed to call lockForConfigChange before serviceInit");
+ }
+ if (isDynamic) {
+ LlapZookeeperRegistryImpl zkRegisty = (LlapZookeeperRegistryImpl)registry;
+ return zkRegisty.lockForConfigChange(windowStart, windowEnd);
+ } else {
+ throw new UnsupportedOperationException("Acquiring config lock is only allowed for dynamic registries");
+ }
+ }
+
public LlapServiceInstanceSet getInstances() throws IOException {
return getInstances(0);
}
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index bf7b76b..9e1da9b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -1,19 +1,26 @@
/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package org.apache.hadoop.hive.llap.registry.impl;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import com.google.common.collect.Sets;
@@ -68,6 +75,8 @@ public class LlapZookeeperRegistryImpl
private final static String NAMESPACE_PREFIX = "llap-";
private static final String SLOT_PREFIX = "slot-";
private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
+ private static final String CONFIG_CHANGE_PATH = "config-change";
+ private static final String CONFIG_CHANGE_NODE = "window-end";
private SlotZnode slotZnode;
@@ -76,6 +85,8 @@ public class LlapZookeeperRegistryImpl
// to be used by clients of ServiceRegistry TODO: this is unnecessary
private DynamicServiceInstanceSet instances;
+ private DistributedAtomicLong lockWindowEnd;
+
public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
super(instanceName, conf,
HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX,
@@ -439,4 +450,74 @@ public class LlapZookeeperRegistryImpl
// rather than relying on RegistryUtils.currentUser().
return HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
}
+
+ /**
+ * Locks the Llap Cluster for configuration change for the given time window.
+ * @param windowStart The beginning of the time window when no other configuration change is allowed.
+ * @param windowEnd The end of the time window when no other configuration change is allowed.
+ * @return The result of the change (success if the lock is succeeded, and the next possible
+ * configuration change time
+ */
+ public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) {
+ if (windowEnd < windowStart) {
+ throw new IllegalArgumentException(
+ "WindowStart=" + windowStart + " can not be smaller than WindowEnd=" + windowEnd);
+ }
+ try {
+ if (lockWindowEnd == null) {
+ // Create the node with the /llap-sasl/hiveuser/hostname/config-change/next-change path without retry
+ lockWindowEnd = new DistributedAtomicLong(zooKeeperClient,
+ String.join("/", workersPath.substring(0, workersPath.lastIndexOf('/')), CONFIG_CHANGE_PATH,
+ CONFIG_CHANGE_NODE), (i, j, sleeper) -> false);
+ lockWindowEnd.initialize(0L);
+ }
+ AtomicValue<Long> current = lockWindowEnd.get();
+ if (!current.succeeded()) {
+ LOG.debug("Can not get the current configuration lock time");
+ return new ConfigChangeLockResult(false, -1L);
+ }
+ if (current.postValue() > windowStart) {
+ LOG.debug("Can not lock window {}-{}. Current value is {}.", windowStart, windowEnd, current.postValue());
+ return new ConfigChangeLockResult(false, current.postValue());
+ }
+ current = lockWindowEnd.compareAndSet(current.postValue(), windowEnd);
+ if (!current.succeeded()) {
+ LOG.debug("Can not lock window {}-{}. Current value is changed to {}.", windowStart, windowEnd,
+ current.postValue());
+ return new ConfigChangeLockResult(false, current.postValue());
+ }
+ return new ConfigChangeLockResult(true, current.postValue());
+ } catch (Throwable t) {
+ LOG.info("Can not reserve configuration change lock", t);
+ return new ConfigChangeLockResult(false, -1L);
+ }
+ }
+
+ /**
+ * The return data of a config change. Successful or not successful and the next time a config
+ * change can be attempted.
+ */
+ public static class ConfigChangeLockResult {
+ private final boolean success;
+ private final long nextConfigChangeTime;
+
+ @VisibleForTesting
+ public ConfigChangeLockResult(boolean success, long nextConfigChangeTime) {
+ this.success = success;
+ this.nextConfigChangeTime = nextConfigChangeTime;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public long getNextConfigChangeTime() {
+ return nextConfigChangeTime;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigChangeLockResult [" + success + "," + nextConfigChangeTime + "]";
+ }
+ }
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java
new file mode 100644
index 0000000..065160b
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+
+/**
+ * Llap Registry service related tests. Currently only for configuration change.
+ */
+public class TestLlapRegistryService {
+ private static MiniLlapCluster cluster = null;
+ private static HiveConf conf = new HiveConf();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MiniLlapCluster.create("llap01", null, 1, 2L, false, false, 1L, 1);
+ HiveConf.setVar(conf, HiveConf.ConfVars.LLAP_DAEMON_XMX_HEADROOM, "1");
+ cluster.serviceInit(conf);
+ cluster.serviceStart();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (cluster != null) {
+ cluster.serviceStop();
+ }
+ }
+
+ @Test
+ public void testLockForConfigChange() throws IOException {
+ LlapRegistryService client1 = null;
+ LlapRegistryService client2 = null;
+ ConfigChangeLockResult result;
+
+ try {
+ client1 = new LlapRegistryService(false);
+ client1.init(conf);
+ client1.start();
+
+ client2 = new LlapRegistryService(false);
+ client2.init(conf);
+ client2.start();
+
+ assertTrue(client1.lockForConfigChange(10000, 20000).isSuccess());
+ assertTrue(client2.lockForConfigChange(30000, 40000).isSuccess());
+
+ // Can not set to before
+ result = client1.lockForConfigChange(20000, 30000);
+ assertFalse(result.isSuccess());
+ assertEquals(result.getNextConfigChangeTime(), 40000);
+
+ result = client1.lockForConfigChange(30000, 40000);
+ assertFalse(result.isSuccess());
+ assertEquals(result.getNextConfigChangeTime(), 40000);
+
+ result = client1.lockForConfigChange(35000, 45000);
+ assertFalse(result.isSuccess());
+ assertEquals(result.getNextConfigChangeTime(), 40000);
+
+ // Can start from the previous end timestamp
+ result = client1.lockForConfigChange(40000, 50000);
+ assertTrue(result.isSuccess());
+ assertEquals(result.getNextConfigChangeTime(), 50000);
+ } finally {
+ if (client1 != null) {
+ client1.close();
+ }
+ if (client2 != null) {
+ client2.close();
+ }
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testLockForConfigChangeInvalid() throws IOException{
+ LlapRegistryService client = null;
+
+ try {
+ client = new LlapRegistryService(false);
+ client.init(conf);
+ client.start();
+
+ client.lockForConfigChange(20000, 10000);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java
new file mode 100644
index 0000000..d2acc5d
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test classes for registry implementations.
+ */
+
+package org.apache.hadoop.hive.llap.registry.impl;
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java
new file mode 100644
index 0000000..61b80f1
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of MetricsListener which blacklists slow nodes based on the statistics.
+ */
+public class BlacklistingLlapMetricsListener implements LlapMetricsListener {
+ private static final Logger LOG = LoggerFactory.getLogger(BlacklistingLlapMetricsListener.class);
+ private LlapRegistryService registry;
+ private LlapManagementProtocolClientImplFactory clientFactory;
+ private int minServedTasksNumber;
+ private int maxBlacklistedNodes;
+ private long minConfigChangeDelayMs;
+ private float timeThreshold;
+ private float emptyExecutorsThreshold;
+ @VisibleForTesting
+ long nextCheckTime = Long.MIN_VALUE;
+
+ @VisibleForTesting
+ void init(Configuration conf, LlapRegistryService registry, LlapManagementProtocolClientImplFactory clientFactory) {
+ this.registry = registry;
+ this.clientFactory = clientFactory;
+ this.minServedTasksNumber =
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_MINTASKS);
+ this.minConfigChangeDelayMs =
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_MININTERVALDURATION,
+ TimeUnit.MILLISECONDS);
+ this.timeThreshold =
+ HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_TASKTIMERATIO);
+ this.emptyExecutorsThreshold =
+ HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_EXECUTORRATIO);
+ this.maxBlacklistedNodes =
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_MAXNODES);
+
+ Preconditions.checkArgument(minServedTasksNumber > 0,
+ "Minimum served tasks should be greater than 0");
+ Preconditions.checkArgument(minConfigChangeDelayMs > 0,
+ "Minimum config change delay should be greater than 0");
+ Preconditions.checkArgument(timeThreshold > 1.0f,
+ "The time threshold should be greater than 1");
+ Preconditions.checkArgument(maxBlacklistedNodes > 0,
+ "The maximum number of blacklisted node should be greater than 1");
+ Preconditions.checkNotNull(registry, "Registry should not be null");
+ Preconditions.checkNotNull(clientFactory, "ClientFactory should not be null");
+
+ LOG.info("BlacklistingLlapMetricsListener initialized with " +
+ "minServedTasksNumber={}, " +
+ "minConfigChangeDelayMs={}, " +
+ "timeThreshold={}, " +
+ "emptyExecutorsThreshold={}, " +
+ "maxBlacklistedNodes={}",
+ minServedTasksNumber, minConfigChangeDelayMs, timeThreshold, emptyExecutorsThreshold, maxBlacklistedNodes);
+ }
+
+ @Override
+ public void init(Configuration conf, LlapRegistryService registry) {
+ init(conf, registry, LlapManagementProtocolClientImplFactory.basicInstance(conf));
+ }
+
+ @Override
+ public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) {
+ // no op
+ }
+
+ @Override
+ public void newClusterMetrics(Map<String, LlapMetricsCollector.LlapMetrics> newMetrics) {
+ long sumAverageTime = 0;
+ long sumEmptyExecutors = 0;
+ long maxAverageTime = 0;
+ long maxAverageTimeEmptyExecutors = 0;
+ long maxAverageTimeMaxExecutors = 0;
+ long workerNum = 0;
+ int blacklistedNodes = 0;
+ String maxAverageTimeIdentity = null;
+ for (String workerIdentity : newMetrics.keySet()) {
+ Map<String, Long> metrics = newMetrics.get(workerIdentity).getMetrics();
+ long requestHandled = metrics.get(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name());
+ long averageTime = metrics.get(LlapDaemonExecutorInfo.AverageResponseTime.name());
+ long emptyExecutor =
+ metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name());
+ long maxExecutors = metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutors.name());
+
+ LOG.debug("Checking node {} with data: " +
+ "requestHandled={}, " +
+ "averageTime={}, " +
+ "emptyExecutors={}, " +
+ "maxExecutors={}",
+ workerIdentity, requestHandled, averageTime, emptyExecutor, maxExecutors);
+
+ if (maxExecutors == 0) {
+ blacklistedNodes++;
+ if (blacklistedNodes >= this.maxBlacklistedNodes) {
+ LOG.info("Already enough blacklisted nodes {}. Skipping.", blacklistedNodes);
+ return;
+ } else {
+ // We do not interested in the data for the blacklisted nodes
+ continue;
+ }
+ }
+
+ if (requestHandled > this.minServedTasksNumber) {
+ workerNum++;
+ sumAverageTime += averageTime;
+ if (averageTime > maxAverageTime) {
+ maxAverageTime = averageTime;
+ maxAverageTimeEmptyExecutors = emptyExecutor;
+ maxAverageTimeMaxExecutors = maxExecutors;
+ maxAverageTimeIdentity = workerIdentity;
+ }
+ sumEmptyExecutors += emptyExecutor;
+ }
+ }
+
+ // If we do not have enough data then return.
+ if (workerNum < 2) {
+ return;
+ }
+
+ LOG.debug("Found slowest node {} with data: " +
+ "sumAverageTime={}, " +
+ "sumEmptyExecutors={}, " +
+ "maxAverageTime={}, " +
+ "maxAverageTimeEmptyExecutors={}, " +
+ "maxAverageTimeMaxExecutors={}, " +
+ "workerNum={}, " +
+ "maxAverageTimeIdentity={}, " +
+ "blacklistedNodes={}",
+ sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors,
+ maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity, blacklistedNodes);
+ // Check if the slowest node is at least timeThreshold times slower than the average
+ double averageTimeWithoutSlowest = (double)(sumAverageTime - maxAverageTime) / (workerNum - 1);
+ if (averageTimeWithoutSlowest * this.timeThreshold < maxAverageTime) {
+ // We have a candidate, let's see if we have enough empty executors.
+ long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors;
+ if (emptyExecutorsWithoutSlowest > maxAverageTimeMaxExecutors * this.emptyExecutorsThreshold) {
+ // Seems like a good candidate, let's try to blacklist
+ try {
+ LOG.debug("Trying to blacklist node: " + maxAverageTimeIdentity);
+ setCapacity(maxAverageTimeIdentity, 0, 0);
+ } catch (Throwable t) {
+ LOG.debug("Can not blacklist node: " + maxAverageTimeIdentity, t);
+ }
+ }
+ }
+ }
+
+ protected void setCapacity(String workerIdentity, int newExecutorNum, int newWaitQueueSize)
+ throws IOException, ServiceException {
+ long currentTime = System.currentTimeMillis();
+ if (currentTime > nextCheckTime) {
+ LlapZookeeperRegistryImpl.ConfigChangeLockResult lockResult =
+ registry.lockForConfigChange(currentTime, currentTime + this.minConfigChangeDelayMs);
+
+ LOG.debug("Got result for lock check: {}", lockResult);
+ if (lockResult.isSuccess()) {
+ LOG.info("Setting capacity for workerIdentity={} to newExecutorNum={}, newWaitQueueSize={}",
+ workerIdentity, newExecutorNum, newWaitQueueSize);
+ LlapServiceInstance serviceInstance = registry.getInstances().getInstance(workerIdentity);
+ LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance);
+ client.setCapacity(null,
+ SetCapacityRequestProto.newBuilder()
+ .setExecutorNum(newExecutorNum)
+ .setQueueSize(newWaitQueueSize)
+ .build());
+ }
+ if (lockResult.getNextConfigChangeTime() > -1L) {
+ nextCheckTime = lockResult.getNextConfigChangeTime();
+ }
+ } else {
+ LOG.debug("Skipping check. Current time {} and we are waiting for {}.", currentTime, nextCheckTime);
+ }
+ }
+}
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java
new file mode 100644
index 0000000..22f0824
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+
+import javax.net.SocketFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance.
+ */
+public class LlapManagementProtocolClientImplFactory {
+ private final Configuration conf;
+ private final RetryPolicy retryPolicy;
+ private final SocketFactory socketFactory;
+
+ public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy,
+ SocketFactory socketFactory) {
+ this.conf = conf;
+ this.retryPolicy = retryPolicy;
+ this.socketFactory = socketFactory;
+ }
+
+ public static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) {
+ return new LlapManagementProtocolClientImplFactory(
+ conf,
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS),
+ NetUtils.getDefaultSocketFactory(conf));
+ }
+
+ public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) {
+ return new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(),
+ serviceInstance.getManagementPort(), retryPolicy,
+ socketFactory);
+ }
+}
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
index 2ca7ed6..0dcb490 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
@@ -29,16 +29,11 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
-import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.SocketFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -95,7 +90,7 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS);
String listenerClass = HiveConf.getVar(conf,
HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER);
- if (Strings.isBlank(listenerClass)) {
+ if (listenerClass == null || listenerClass.isEmpty()) {
listener = null;
} else {
try {
@@ -211,52 +206,34 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
}
/**
- * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance.
- */
- public static class LlapManagementProtocolClientImplFactory {
- private final Configuration conf;
- private final RetryPolicy retryPolicy;
- private final SocketFactory socketFactory;
-
- public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy,
- SocketFactory socketFactory) {
- this.conf = conf;
- this.retryPolicy = retryPolicy;
- this.socketFactory = socketFactory;
- }
-
- private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) {
- return new LlapManagementProtocolClientImplFactory(
- conf,
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS),
- NetUtils.getDefaultSocketFactory(conf));
- }
-
- public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) {
- LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(),
- serviceInstance.getManagementPort(), retryPolicy,
- socketFactory);
- return client;
- }
- }
-
- /**
* Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp.
*/
public static class LlapMetrics {
private final long timestamp;
- private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics;
+ private final Map<String, Long> metrics;
+
+ @VisibleForTesting
+ LlapMetrics(long timestamp, Map<String, Long> metrics) {
+ this.timestamp = timestamp;
+ this.metrics = metrics;
+ }
public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) {
this.timestamp = System.currentTimeMillis();
- this.metrics = metrics;
+ this.metrics = new HashMap<String, Long>(metrics.getMetricsCount());
+ metrics.getMetricsList().forEach(entry -> this.metrics.put(entry.getKey(), entry.getValue()));
}
public long getTimestamp() {
return timestamp;
}
- public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() {
+ /**
+ * The metric values in the map. The keys are the enum names (See: LlapDaemonExecutorInfo), and
+ * the values are the actual values.
+ * @return The metric map
+ */
+ public Map<String, Long> getMetrics() {
return metrics;
}
}
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java
new file mode 100644
index 0000000..dec7586
--- /dev/null
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityResponseProto;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult;
+import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+/**
+ * Test class to test BlacklistingLlapMetricsListener object.
+ */
+public class TestBlacklistingLlapMetricsListener {
+ private static final SetCapacityResponseProto TEST_RESPONSE =
+ SetCapacityResponseProto.getDefaultInstance();
+
+ private BlacklistingLlapMetricsListener listener;
+
+ private Configuration conf;
+
+ @Mock
+ private LlapRegistryService mockRegistry;
+
+ @Mock
+ private LlapManagementProtocolClientImplFactory mockClientFactory;
+
+ @Mock
+ private LlapManagementProtocolClientImpl mockClient;
+
+ @Mock
+ private LlapServiceInstanceSet mockInstanceSet;
+
+ @Before
+ public void setUp() throws Exception {
+ initMocks(this);
+
+ conf = new HiveConf();
+ when(mockRegistry.getInstances()).thenReturn(mockInstanceSet);
+ when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn(
+ new ConfigChangeLockResult(true, Long.MIN_VALUE));
+ when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient);
+ when(mockClient.setCapacity(
+ any(RpcController.class),
+ any(SetCapacityRequestProto.class))).thenReturn(TEST_RESPONSE);
+
+ listener = new BlacklistingLlapMetricsListener();
+ listener.init(conf, mockRegistry, mockClientFactory);
+ }
+
+ @Test(timeout = 2000)
+ public void testBlacklist() throws ServiceException {
+ Map<String, LlapMetrics> data = generateClusterMetrics();
+ listener.newClusterMetrics(data);
+
+ // Then
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+
+ verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+ verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture());
+ assertEquals("3", argumentCaptor.getValue());
+ }
+
+ @Test(timeout = 2000)
+ public void testNotEnoughCapacity() throws ServiceException {
+ Map<String, LlapMetrics> data = generateClusterMetrics();
+ data.get("0").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), 5L);
+ listener.newClusterMetrics(data);
+
+ verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+ }
+
+ @Test(timeout = 2000)
+ public void testNoOutstandingResponseTime() throws ServiceException {
+ Map<String, LlapMetrics> data = generateClusterMetrics();
+ data.get("3").getMetrics().put(LlapDaemonExecutorInfo.AverageResponseTime.name(), 1500L);
+ listener.newClusterMetrics(data);
+
+ verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+ }
+
+ @Test(timeout = 2000)
+ public void testAlreadyBlacklisted() throws ServiceException {
+ Map<String, LlapMetrics> data = generateClusterMetrics();
+ data.get("3").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutors.name(), 0L);
+ listener.newClusterMetrics(data);
+
+ verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+ }
+
+ @Test(timeout = 2000)
+ public void testCheckTime() throws Exception {
+ Map<String, LlapMetrics> data = generateClusterMetrics();
+
+ // Return that we can not yet blacklist a node
+ long targetTime = System.currentTimeMillis() + 10000;
+ when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn(
+ new ConfigChangeLockResult(false, targetTime));
+ listener.newClusterMetrics(data);
+
+ verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+ assertEquals(targetTime, listener.nextCheckTime);
+
+ // reset mock stuff
+ reset(mockRegistry);
+ when(mockRegistry.getInstances()).thenReturn(mockInstanceSet);
+
+ // We will not try to set the capacity, or even lock until the time is reached
+ listener.newClusterMetrics(data);
+ verify(mockRegistry, never()).lockForConfigChange(anyLong(), anyLong());
+ verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+
+ // If the time is reached, then we lock and blacklist
+ listener.nextCheckTime = System.currentTimeMillis() - 1;
+ when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn(
+ new ConfigChangeLockResult(true, targetTime));
+ listener.newClusterMetrics(data);
+
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+ verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture());
+ assertEquals("3", argumentCaptor.getValue());
+ }
+
+ private Map<String, LlapMetrics> generateClusterMetrics() {
+ Map<String, LlapMetrics> data = new HashMap<>(4);
+ data.put("0", generateSingleNodeMetrics(3000, 1000, 7, 10));
+ data.put("1", generateSingleNodeMetrics(3000, 1000, 7, 10));
+ data.put("2", generateSingleNodeMetrics(3000, 1000, 7, 10));
+ data.put("3", generateSingleNodeMetrics(3000, 2000, 5, 10));
+ return data;
+ }
+
+ private LlapMetrics generateSingleNodeMetrics(long totalRequests, long averageResponseTime,
+ long availableExecutors, long allExecutors) {
+ Map<String, Long> metricsMap = new HashMap<>(4);
+ metricsMap.put(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name(), totalRequests);
+ metricsMap.put(LlapDaemonExecutorInfo.AverageResponseTime.name(), averageResponseTime);
+ metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), availableExecutors);
+ metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutors.name(), allExecutors);
+ return new LlapMetrics(0, metricsMap);
+ }
+}
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
index d9e1f71..f212ac6 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
@@ -63,7 +63,7 @@ public class TestLlapMetricsCollector {
private Configuration mockConf;
@Mock
- private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory;
+ private LlapManagementProtocolClientImplFactory mockClientFactory;
@Mock
private LlapManagementProtocolClientImpl mockClient;