You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/18 09:41:09 UTC

[hive] branch master updated: HIVE-21912: Implement BlacklistingLlapMetricsListener (Peter Vary reviewed by Oliver Draese and Adam Szita)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 374f361  HIVE-21912: Implement BlacklistingLlapMetricsListener (Peter Vary reviewed by Oliver Draese and Adam Szita)
374f361 is described below

commit 374f361264780a322bd4fc6e40a29606140d517a
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Thu Jul 18 11:40:12 2019 +0200

    HIVE-21912: Implement BlacklistingLlapMetricsListener (Peter Vary reviewed by Oliver Draese and Adam Szita)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  26 +++
 .../llap/registry/impl/LlapRegistryService.java    |  44 ++++-
 .../registry/impl/LlapZookeeperRegistryImpl.java   |  99 +++++++++-
 .../hive/llap/metrics/LlapDaemonExecutorInfo.java  |   0
 .../registry/impl/TestLlapRegistryService.java     | 116 ++++++++++++
 .../hive/llap/registry/impl/package-info.java      |  23 +++
 .../metrics/BlacklistingLlapMetricsListener.java   | 209 +++++++++++++++++++++
 .../LlapManagementProtocolClientImplFactory.java   |  58 ++++++
 .../tezplugins/metrics/LlapMetricsCollector.java   |  55 ++----
 .../TestBlacklistingLlapMetricsListener.java       | 184 ++++++++++++++++++
 .../metrics/TestLlapMetricsCollector.java          |   2 +-
 11 files changed, 758 insertions(+), 58 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e13785..0f34986 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4358,6 +4358,32 @@ public class HiveConf extends Configuration {
       "The listener which is called when new Llap Daemon statistics is received on AM side.\n" +
       "The listener should implement the " +
       "org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsListener interface."),
+    LLAP_NODEHEALTHCHECKS_MINTASKS(
+      "hive.llap.nodehealthchecks.mintasks", 2000,
+      "Specifies the minimum amount of tasks, executed by a particular LLAP daemon, before the health\n" +
+      "status of the node is examined."),
+    LLAP_NODEHEALTHCHECKS_MININTERVALDURATION(
+      "hive.llap.nodehealthckecks.minintervalduration", "300s",
+      new TimeValidator(TimeUnit.SECONDS),
+      "The minimum time that needs to elapse between two actions that are the correcting results of identifying\n" +
+      "an unhealthy node. Even if additional nodes are considered to be unhealthy, no action is performed until\n" +
+      "this time interval has passed since the last corrective action."),
+    LLAP_NODEHEALTHCHECKS_TASKTIMERATIO(
+      "hive.llap.nodehealthckecks.tasktimeratio", 1.5f,
+      "LLAP daemons are considered unhealthy, if their average (Map-) task execution time is significantly larger\n" +
+      "than the average task execution time of other nodes. This value specifies the ratio of a node to other\n" +
+      "nodes, which is considered as threshold for unhealthy. A value of 1.5 for example considers a node to be\n" +
+      "unhealthy if its average task execution time is 50% larger than the average of other nodes."),
+    LLAP_NODEHEALTHCHECKS_EXECUTORRATIO(
+      "hive.llap.nodehealthckecks.executorratio", 2.0f,
+      "If an unhealthy node is identified, it is blacklisted only where there is enough free executors to execute\n" +
+      "the tasks. This value specifies the ratio of the free executors compared to the blacklisted ones.\n" +
+      "A value of 2.0 for example defines that we blacklist an unhealthy node only if we have 2 times more\n" +
+      "free executors on the remaining nodes than the unhealthy node."),
+    LLAP_NODEHEALTHCHECKS_MAXNODES(
+      "hive.llap.nodehealthckecks.maxnodes", 1,
+      "The maximum number of blacklisted nodes. If there are at least this number of blacklisted nodes\n" +
+      "the listener will not blacklist further nodes even if all the conditions are met."),
     LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap",
       "AM registry name for LLAP task scheduler plugin to register with."),
     LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "",
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index ea824a1..2d05bda 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -1,16 +1,21 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
 package org.apache.hadoop.hive.llap.registry.impl;
 
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
@@ -19,11 +24,13 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -142,6 +149,25 @@ public class LlapRegistryService extends AbstractService {
     }
   }
 
+  /**
+   * Locks the Llap Cluster for configuration change for the given time window.
+   * @param windowStart The beginning of the time window when no other configuration change is allowed.
+   * @param windowEnd The end of the time window when no other configuration change is allowed.
+   * @return The result of the change (success if the lock is succeeded, and the next possible
+   * configuration change time
+   */
+  public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) {
+    if (this.registry == null) {
+      throw new IllegalStateException("Not allowed to call lockForConfigChange before serviceInit");
+    }
+    if (isDynamic) {
+      LlapZookeeperRegistryImpl zkRegisty = (LlapZookeeperRegistryImpl)registry;
+      return zkRegisty.lockForConfigChange(windowStart, windowEnd);
+    } else {
+      throw new UnsupportedOperationException("Acquiring config lock is only allowed for dynamic registries");
+    }
+  }
+
   public LlapServiceInstanceSet getInstances() throws IOException {
     return getInstances(0);
   }
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index bf7b76b..9e1da9b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -1,19 +1,26 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
 package org.apache.hadoop.hive.llap.registry.impl;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.recipes.atomic.AtomicValue;
+import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 
 import com.google.common.collect.Sets;
@@ -68,6 +75,8 @@ public class LlapZookeeperRegistryImpl
   private final static String NAMESPACE_PREFIX = "llap-";
   private static final String SLOT_PREFIX = "slot-";
   private static final String SASL_LOGIN_CONTEXT_NAME = "LlapZooKeeperClient";
+  private static final String CONFIG_CHANGE_PATH = "config-change";
+  private static final String CONFIG_CHANGE_NODE = "window-end";
 
 
   private SlotZnode slotZnode;
@@ -76,6 +85,8 @@ public class LlapZookeeperRegistryImpl
   // to be used by clients of ServiceRegistry TODO: this is unnecessary
   private DynamicServiceInstanceSet instances;
 
+  private DistributedAtomicLong lockWindowEnd;
+
   public LlapZookeeperRegistryImpl(String instanceName, Configuration conf) {
     super(instanceName, conf,
         HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_NAMESPACE), NAMESPACE_PREFIX,
@@ -439,4 +450,74 @@ public class LlapZookeeperRegistryImpl
     // rather than relying on RegistryUtils.currentUser().
     return HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser());
   }
+
+  /**
+   * Locks the Llap Cluster for configuration change for the given time window.
+   * @param windowStart The beginning of the time window when no other configuration change is allowed.
+   * @param windowEnd The end of the time window when no other configuration change is allowed.
+   * @return The result of the change (success if the lock is succeeded, and the next possible
+   * configuration change time
+   */
+  public ConfigChangeLockResult lockForConfigChange(long windowStart, long windowEnd) {
+    if (windowEnd < windowStart) {
+      throw new IllegalArgumentException(
+          "WindowStart=" + windowStart + " can not be smaller than WindowEnd=" + windowEnd);
+    }
+    try {
+      if (lockWindowEnd == null) {
+        // Create the node with the /llap-sasl/hiveuser/hostname/config-change/next-change path without retry
+        lockWindowEnd = new DistributedAtomicLong(zooKeeperClient,
+            String.join("/", workersPath.substring(0, workersPath.lastIndexOf('/')), CONFIG_CHANGE_PATH,
+                CONFIG_CHANGE_NODE), (i, j, sleeper) -> false);
+        lockWindowEnd.initialize(0L);
+      }
+      AtomicValue<Long> current = lockWindowEnd.get();
+      if (!current.succeeded()) {
+        LOG.debug("Can not get the current configuration lock time");
+        return new ConfigChangeLockResult(false, -1L);
+      }
+      if (current.postValue() > windowStart) {
+        LOG.debug("Can not lock window {}-{}. Current value is {}.", windowStart, windowEnd, current.postValue());
+        return new ConfigChangeLockResult(false, current.postValue());
+      }
+      current = lockWindowEnd.compareAndSet(current.postValue(), windowEnd);
+      if (!current.succeeded()) {
+        LOG.debug("Can not lock window {}-{}. Current value is changed to {}.", windowStart, windowEnd,
+            current.postValue());
+        return new ConfigChangeLockResult(false, current.postValue());
+      }
+      return new ConfigChangeLockResult(true, current.postValue());
+    } catch (Throwable t) {
+      LOG.info("Can not reserve configuration change lock", t);
+      return new ConfigChangeLockResult(false, -1L);
+    }
+  }
+
+  /**
+   * The return data of a config change. Successful or not successful and the next time a config
+   * change can be attempted.
+   */
+  public static class ConfigChangeLockResult {
+    private final boolean success;
+    private final long nextConfigChangeTime;
+
+    @VisibleForTesting
+    public ConfigChangeLockResult(boolean success, long nextConfigChangeTime) {
+      this.success = success;
+      this.nextConfigChangeTime = nextConfigChangeTime;
+    }
+
+    public boolean isSuccess() {
+      return success;
+    }
+
+    public long getNextConfigChangeTime() {
+      return nextConfigChangeTime;
+    }
+
+    @Override
+    public String toString() {
+      return "ConfigChangeLockResult [" + success + "," + nextConfigChangeTime + "]";
+    }
+  }
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
similarity index 100%
rename from llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
rename to llap-common/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java
new file mode 100644
index 0000000..065160b
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapRegistryService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+
+/**
+ * Llap Registry service related tests. Currently only for configuration change.
+ */
+public class TestLlapRegistryService {
+  private static MiniLlapCluster cluster = null;
+  private static HiveConf conf = new HiveConf();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MiniLlapCluster.create("llap01", null, 1, 2L, false, false, 1L, 1);
+    HiveConf.setVar(conf, HiveConf.ConfVars.LLAP_DAEMON_XMX_HEADROOM, "1");
+    cluster.serviceInit(conf);
+    cluster.serviceStart();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.serviceStop();
+    }
+  }
+
+  @Test
+  public void testLockForConfigChange() throws IOException {
+    LlapRegistryService client1 = null;
+    LlapRegistryService client2 = null;
+    ConfigChangeLockResult result;
+
+    try {
+      client1 = new LlapRegistryService(false);
+      client1.init(conf);
+      client1.start();
+
+      client2 = new LlapRegistryService(false);
+      client2.init(conf);
+      client2.start();
+
+      assertTrue(client1.lockForConfigChange(10000, 20000).isSuccess());
+      assertTrue(client2.lockForConfigChange(30000, 40000).isSuccess());
+
+      // Can not set to before
+      result = client1.lockForConfigChange(20000, 30000);
+      assertFalse(result.isSuccess());
+      assertEquals(result.getNextConfigChangeTime(), 40000);
+
+      result = client1.lockForConfigChange(30000, 40000);
+      assertFalse(result.isSuccess());
+      assertEquals(result.getNextConfigChangeTime(), 40000);
+
+      result = client1.lockForConfigChange(35000, 45000);
+      assertFalse(result.isSuccess());
+      assertEquals(result.getNextConfigChangeTime(), 40000);
+
+      // Can start from the previous end timestamp
+      result = client1.lockForConfigChange(40000, 50000);
+      assertTrue(result.isSuccess());
+      assertEquals(result.getNextConfigChangeTime(), 50000);
+    } finally {
+      if (client1 != null) {
+        client1.close();
+      }
+      if (client2 != null) {
+        client2.close();
+      }
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testLockForConfigChangeInvalid() throws IOException{
+    LlapRegistryService client = null;
+
+    try {
+      client = new LlapRegistryService(false);
+      client.init(conf);
+      client.start();
+
+      client.lockForConfigChange(20000, 10000);
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java
new file mode 100644
index 0000000..d2acc5d
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/registry/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test classes for registry implementations.
+ */
+
+package org.apache.hadoop.hive.llap.registry.impl;
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java
new file mode 100644
index 0000000..61b80f1
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/BlacklistingLlapMetricsListener.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of MetricsListener which blacklists slow nodes based on the statistics.
+ */
+public class BlacklistingLlapMetricsListener implements LlapMetricsListener {
+  private static final Logger LOG = LoggerFactory.getLogger(BlacklistingLlapMetricsListener.class);
+  private LlapRegistryService registry;
+  private LlapManagementProtocolClientImplFactory clientFactory;
+  private int minServedTasksNumber;
+  private int maxBlacklistedNodes;
+  private long minConfigChangeDelayMs;
+  private float timeThreshold;
+  private float emptyExecutorsThreshold;
+  @VisibleForTesting
+  long nextCheckTime = Long.MIN_VALUE;
+
+  @VisibleForTesting
+  void init(Configuration conf, LlapRegistryService registry, LlapManagementProtocolClientImplFactory clientFactory) {
+    this.registry = registry;
+    this.clientFactory = clientFactory;
+    this.minServedTasksNumber =
+        HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_MINTASKS);
+    this.minConfigChangeDelayMs =
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_MININTERVALDURATION,
+            TimeUnit.MILLISECONDS);
+    this.timeThreshold =
+        HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_TASKTIMERATIO);
+    this.emptyExecutorsThreshold =
+        HiveConf.getFloatVar(conf,
+            HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_EXECUTORRATIO);
+    this.maxBlacklistedNodes =
+        HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_NODEHEALTHCHECKS_MAXNODES);
+
+    Preconditions.checkArgument(minServedTasksNumber > 0,
+        "Minimum served tasks should be greater than 0");
+    Preconditions.checkArgument(minConfigChangeDelayMs > 0,
+        "Minimum config change delay should be greater than 0");
+    Preconditions.checkArgument(timeThreshold > 1.0f,
+        "The time threshold should be greater than 1");
+    Preconditions.checkArgument(maxBlacklistedNodes > 0,
+        "The maximum number of blacklisted node should be greater than 1");
+    Preconditions.checkNotNull(registry, "Registry should not be null");
+    Preconditions.checkNotNull(clientFactory, "ClientFactory should not be null");
+
+    LOG.info("BlacklistingLlapMetricsListener initialized with " +
+                  "minServedTasksNumber={}, " +
+                  "minConfigChangeDelayMs={}, " +
+                  "timeThreshold={}, " +
+                  "emptyExecutorsThreshold={}, " +
+                  "maxBlacklistedNodes={}",
+        minServedTasksNumber, minConfigChangeDelayMs, timeThreshold, emptyExecutorsThreshold, maxBlacklistedNodes);
+  }
+
+  @Override
+  public void init(Configuration conf, LlapRegistryService registry) {
+    init(conf, registry, LlapManagementProtocolClientImplFactory.basicInstance(conf));
+  }
+
+  @Override
+  public void newDaemonMetrics(String workerIdentity, LlapMetricsCollector.LlapMetrics newMetrics) {
+    // no op
+  }
+
+  @Override
+  public void newClusterMetrics(Map<String, LlapMetricsCollector.LlapMetrics> newMetrics) {
+    long sumAverageTime = 0;
+    long sumEmptyExecutors = 0;
+    long maxAverageTime = 0;
+    long maxAverageTimeEmptyExecutors = 0;
+    long maxAverageTimeMaxExecutors = 0;
+    long workerNum = 0;
+    int blacklistedNodes = 0;
+    String maxAverageTimeIdentity = null;
+    for (String workerIdentity : newMetrics.keySet()) {
+      Map<String, Long> metrics = newMetrics.get(workerIdentity).getMetrics();
+      long requestHandled = metrics.get(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name());
+      long averageTime = metrics.get(LlapDaemonExecutorInfo.AverageResponseTime.name());
+      long emptyExecutor =
+          metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name());
+      long maxExecutors = metrics.get(LlapDaemonExecutorInfo.ExecutorNumExecutors.name());
+
+      LOG.debug("Checking node {} with data: " +
+                    "requestHandled={}, " +
+                    "averageTime={}, " +
+                    "emptyExecutors={}, " +
+                    "maxExecutors={}",
+          workerIdentity, requestHandled, averageTime, emptyExecutor, maxExecutors);
+
+      if (maxExecutors == 0) {
+        blacklistedNodes++;
+        if (blacklistedNodes >= this.maxBlacklistedNodes) {
+          LOG.info("Already enough blacklisted nodes {}. Skipping.", blacklistedNodes);
+          return;
+        } else {
+          // We do not interested in the data for the blacklisted nodes
+          continue;
+        }
+      }
+
+      if (requestHandled > this.minServedTasksNumber) {
+        workerNum++;
+        sumAverageTime += averageTime;
+        if (averageTime > maxAverageTime) {
+          maxAverageTime = averageTime;
+          maxAverageTimeEmptyExecutors = emptyExecutor;
+          maxAverageTimeMaxExecutors = maxExecutors;
+          maxAverageTimeIdentity = workerIdentity;
+        }
+        sumEmptyExecutors += emptyExecutor;
+      }
+    }
+
+    // If we do not have enough data then return.
+    if (workerNum < 2) {
+      return;
+    }
+
+    LOG.debug("Found slowest node {} with data: " +
+                  "sumAverageTime={}, " +
+                  "sumEmptyExecutors={}, " +
+                  "maxAverageTime={}, " +
+                  "maxAverageTimeEmptyExecutors={}, " +
+                  "maxAverageTimeMaxExecutors={}, " +
+                  "workerNum={}, " +
+                  "maxAverageTimeIdentity={}, " +
+                  "blacklistedNodes={}",
+        sumAverageTime, sumEmptyExecutors, maxAverageTime, maxAverageTimeEmptyExecutors,
+        maxAverageTimeMaxExecutors, workerNum, maxAverageTimeIdentity, blacklistedNodes);
+    // Check if the slowest node is at least timeThreshold times slower than the average
+    double averageTimeWithoutSlowest = (double)(sumAverageTime - maxAverageTime) / (workerNum - 1);
+    if (averageTimeWithoutSlowest * this.timeThreshold < maxAverageTime) {
+      // We have a candidate, let's see if we have enough empty executors.
+      long emptyExecutorsWithoutSlowest = sumEmptyExecutors - maxAverageTimeEmptyExecutors;
+      if (emptyExecutorsWithoutSlowest > maxAverageTimeMaxExecutors * this.emptyExecutorsThreshold) {
+        // Seems like a good candidate, let's try to blacklist
+        try {
+          LOG.debug("Trying to blacklist node: " + maxAverageTimeIdentity);
+          setCapacity(maxAverageTimeIdentity, 0, 0);
+        } catch (Throwable t) {
+          LOG.debug("Can not blacklist node: " + maxAverageTimeIdentity, t);
+        }
+      }
+    }
+  }
+
+  protected void setCapacity(String workerIdentity, int newExecutorNum, int newWaitQueueSize)
+      throws IOException, ServiceException {
+    long currentTime = System.currentTimeMillis();
+    if (currentTime > nextCheckTime) {
+      LlapZookeeperRegistryImpl.ConfigChangeLockResult lockResult =
+          registry.lockForConfigChange(currentTime, currentTime + this.minConfigChangeDelayMs);
+
+      LOG.debug("Got result for lock check: {}", lockResult);
+      if (lockResult.isSuccess()) {
+        LOG.info("Setting capacity for workerIdentity={} to newExecutorNum={}, newWaitQueueSize={}",
+            workerIdentity, newExecutorNum, newWaitQueueSize);
+        LlapServiceInstance serviceInstance = registry.getInstances().getInstance(workerIdentity);
+        LlapManagementProtocolClientImpl client = clientFactory.create(serviceInstance);
+        client.setCapacity(null,
+            SetCapacityRequestProto.newBuilder()
+                .setExecutorNum(newExecutorNum)
+                .setQueueSize(newWaitQueueSize)
+                .build());
+      }
+      if (lockResult.getNextConfigChangeTime() > -1L) {
+        nextCheckTime = lockResult.getNextConfigChangeTime();
+      }
+    } else {
+      LOG.debug("Skipping check. Current time {} and we are waiting for {}.", currentTime, nextCheckTime);
+    }
+  }
+}
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java
new file mode 100644
index 0000000..22f0824
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapManagementProtocolClientImplFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+
+import javax.net.SocketFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance.
+ */
+public class LlapManagementProtocolClientImplFactory {
+  private final Configuration conf;
+  private final RetryPolicy retryPolicy;
+  private final SocketFactory socketFactory;
+
+  public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy,
+                                                 SocketFactory socketFactory) {
+    this.conf = conf;
+    this.retryPolicy = retryPolicy;
+    this.socketFactory = socketFactory;
+  }
+
+  public static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) {
+    return new LlapManagementProtocolClientImplFactory(
+        conf,
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS),
+        NetUtils.getDefaultSocketFactory(conf));
+  }
+
+  public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) {
+    return new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(),
+        serviceInstance.getManagementPort(), retryPolicy,
+        socketFactory);
+  }
+}
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
index 2ca7ed6..0dcb490 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapMetricsCollector.java
@@ -29,16 +29,11 @@ import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceStateChangeListener;
-import org.apache.logging.log4j.util.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.SocketFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -95,7 +90,7 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
             HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_MS, TimeUnit.MILLISECONDS);
     String listenerClass = HiveConf.getVar(conf,
         HiveConf.ConfVars.LLAP_TASK_SCHEDULER_AM_COLLECT_DAEMON_METRICS_LISTENER);
-    if (Strings.isBlank(listenerClass)) {
+    if (listenerClass == null || listenerClass.isEmpty()) {
       listener = null;
     } else {
       try {
@@ -211,52 +206,34 @@ public class LlapMetricsCollector implements ServiceStateChangeListener,
   }
 
   /**
-   * Creates a LlapManagementProtocolClientImpl from a given LlapServiceInstance.
-   */
-  public static class LlapManagementProtocolClientImplFactory {
-    private final Configuration conf;
-    private final RetryPolicy retryPolicy;
-    private final SocketFactory socketFactory;
-
-    public LlapManagementProtocolClientImplFactory(Configuration conf, RetryPolicy retryPolicy,
-                                                   SocketFactory socketFactory) {
-      this.conf = conf;
-      this.retryPolicy = retryPolicy;
-      this.socketFactory = socketFactory;
-    }
-
-    private static LlapManagementProtocolClientImplFactory basicInstance(Configuration conf) {
-      return new LlapManagementProtocolClientImplFactory(
-              conf,
-              RetryPolicies.retryUpToMaximumCountWithFixedSleep(5, 3000L, TimeUnit.MILLISECONDS),
-              NetUtils.getDefaultSocketFactory(conf));
-    }
-
-    public LlapManagementProtocolClientImpl create(LlapServiceInstance serviceInstance) {
-      LlapManagementProtocolClientImpl client = new LlapManagementProtocolClientImpl(conf, serviceInstance.getHost(),
-              serviceInstance.getManagementPort(), retryPolicy,
-              socketFactory);
-      return client;
-    }
-  }
-
-  /**
    * Stores the metrics retrieved from the llap daemons, along with the retrieval timestamp.
    */
   public static class LlapMetrics {
     private final long timestamp;
-    private final LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics;
+    private final Map<String, Long> metrics;
+
+    @VisibleForTesting
+    LlapMetrics(long timestamp, Map<String, Long> metrics) {
+      this.timestamp = timestamp;
+      this.metrics = metrics;
+    }
 
     public LlapMetrics(LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto metrics) {
       this.timestamp = System.currentTimeMillis();
-      this.metrics = metrics;
+      this.metrics = new HashMap<String, Long>(metrics.getMetricsCount());
+      metrics.getMetricsList().forEach(entry -> this.metrics.put(entry.getKey(), entry.getValue()));
     }
 
     public long getTimestamp() {
       return timestamp;
     }
 
-    public LlapDaemonProtocolProtos.GetDaemonMetricsResponseProto getMetrics() {
+    /**
+     * The metric values in the map. The keys are the enum names (See: LlapDaemonExecutorInfo), and
+     * the values are the actual values.
+     * @return The metric map
+     */
+    public Map<String, Long> getMetrics() {
       return metrics;
     }
   }
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java
new file mode 100644
index 0000000..dec7586
--- /dev/null
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestBlacklistingLlapMetricsListener.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.metrics;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SetCapacityResponseProto;
+import org.apache.hadoop.hive.llap.impl.LlapManagementProtocolClientImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl.ConfigChangeLockResult;
+import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapMetricsCollector.LlapMetrics;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+/**
+ * Test class to test BlacklistingLlapMetricsListener object.
+ */
+public class TestBlacklistingLlapMetricsListener {
+  private static final SetCapacityResponseProto TEST_RESPONSE =
+      SetCapacityResponseProto.getDefaultInstance();
+
+  private BlacklistingLlapMetricsListener listener;
+
+  private Configuration conf;
+
+  @Mock
+  private LlapRegistryService mockRegistry;
+
+  @Mock
+  private LlapManagementProtocolClientImplFactory mockClientFactory;
+
+  @Mock
+  private LlapManagementProtocolClientImpl mockClient;
+
+  @Mock
+  private LlapServiceInstanceSet mockInstanceSet;
+
+  @Before
+  public void setUp() throws Exception {
+    initMocks(this);
+
+    conf = new HiveConf();
+    when(mockRegistry.getInstances()).thenReturn(mockInstanceSet);
+    when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn(
+        new ConfigChangeLockResult(true, Long.MIN_VALUE));
+    when(mockClientFactory.create(any(LlapServiceInstance.class))).thenReturn(mockClient);
+    when(mockClient.setCapacity(
+        any(RpcController.class),
+        any(SetCapacityRequestProto.class))).thenReturn(TEST_RESPONSE);
+
+    listener = new BlacklistingLlapMetricsListener();
+    listener.init(conf, mockRegistry, mockClientFactory);
+  }
+
+  @Test(timeout = 2000)
+  public void testBlacklist() throws ServiceException {
+    Map<String, LlapMetrics> data = generateClusterMetrics();
+    listener.newClusterMetrics(data);
+
+    // Then
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+
+    verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+    verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture());
+    assertEquals("3", argumentCaptor.getValue());
+  }
+
+  @Test(timeout = 2000)
+  public void testNotEnoughCapacity() throws ServiceException {
+    Map<String, LlapMetrics> data = generateClusterMetrics();
+    data.get("0").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), 5L);
+    listener.newClusterMetrics(data);
+
+    verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+  }
+
+  @Test(timeout = 2000)
+  public void testNoOutstandingResponseTime() throws ServiceException {
+    Map<String, LlapMetrics> data = generateClusterMetrics();
+    data.get("3").getMetrics().put(LlapDaemonExecutorInfo.AverageResponseTime.name(), 1500L);
+    listener.newClusterMetrics(data);
+
+    verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+  }
+
+  @Test(timeout = 2000)
+  public void testAlreadyBlacklisted() throws ServiceException {
+    Map<String, LlapMetrics> data = generateClusterMetrics();
+    data.get("3").getMetrics().put(LlapDaemonExecutorInfo.ExecutorNumExecutors.name(), 0L);
+    listener.newClusterMetrics(data);
+
+    verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+  }
+
+  @Test(timeout = 2000)
+  public void testCheckTime() throws Exception {
+    Map<String, LlapMetrics> data = generateClusterMetrics();
+
+    // Return that we can not yet blacklist a node
+    long targetTime = System.currentTimeMillis() + 10000;
+    when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn(
+        new ConfigChangeLockResult(false, targetTime));
+    listener.newClusterMetrics(data);
+
+    verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+    assertEquals(targetTime, listener.nextCheckTime);
+
+    // reset mock stuff
+    reset(mockRegistry);
+    when(mockRegistry.getInstances()).thenReturn(mockInstanceSet);
+
+    // We will not try to set the capacity, or even lock until the time is reached
+    listener.newClusterMetrics(data);
+    verify(mockRegistry, never()).lockForConfigChange(anyLong(), anyLong());
+    verify(mockClient, never()).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+
+    // If the time is reached, then we lock and blacklist
+    listener.nextCheckTime = System.currentTimeMillis() - 1;
+    when(mockRegistry.lockForConfigChange(anyLong(), anyLong())).thenReturn(
+        new ConfigChangeLockResult(true, targetTime));
+    listener.newClusterMetrics(data);
+
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    verify(mockClient, times(1)).setCapacity(any(RpcController.class), any(SetCapacityRequestProto.class));
+    verify(mockInstanceSet, times(1)).getInstance(argumentCaptor.capture());
+    assertEquals("3", argumentCaptor.getValue());
+  }
+
+  private Map<String, LlapMetrics> generateClusterMetrics() {
+    Map<String, LlapMetrics> data = new HashMap<>(4);
+    data.put("0", generateSingleNodeMetrics(3000, 1000, 7, 10));
+    data.put("1", generateSingleNodeMetrics(3000, 1000, 7, 10));
+    data.put("2", generateSingleNodeMetrics(3000, 1000, 7, 10));
+    data.put("3", generateSingleNodeMetrics(3000, 2000, 5, 10));
+    return data;
+  }
+
+  private LlapMetrics generateSingleNodeMetrics(long totalRequests, long averageResponseTime,
+      long availableExecutors, long allExecutors) {
+    Map<String, Long> metricsMap = new HashMap<>(4);
+    metricsMap.put(LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled.name(), totalRequests);
+    metricsMap.put(LlapDaemonExecutorInfo.AverageResponseTime.name(), averageResponseTime);
+    metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutorsAvailableAverage.name(), availableExecutors);
+    metricsMap.put(LlapDaemonExecutorInfo.ExecutorNumExecutors.name(), allExecutors);
+    return new LlapMetrics(0, metricsMap);
+  }
+}
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
index d9e1f71..f212ac6 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/metrics/TestLlapMetricsCollector.java
@@ -63,7 +63,7 @@ public class TestLlapMetricsCollector {
   private Configuration mockConf;
 
   @Mock
-  private LlapMetricsCollector.LlapManagementProtocolClientImplFactory mockClientFactory;
+  private LlapManagementProtocolClientImplFactory mockClientFactory;
 
   @Mock
   private LlapManagementProtocolClientImpl mockClient;