You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/09 12:59:32 UTC

[hive] branch master updated: HIVE-21909: Publish the LLAP Daemon capacity through ZooKeeper and honor the capacity when scheduling new tasks (Antal Sinkovits reviewed by Oliver Draese and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ba5f0f8  HIVE-21909: Publish the LLAP Daemon capacity through ZooKeeper and honor the capacity when scheduling new tasks (Antal Sinkovits reviewed by Oliver Draese and Peter Vary)
ba5f0f8 is described below

commit ba5f0f883444ad34cb9d2bd883c4602df571dcb1
Author: Antal Sinkovits <as...@cloudera.com>
AuthorDate: Tue Jul 9 14:58:46 2019 +0200

    HIVE-21909: Publish the LLAP Daemon capacity through ZooKeeper and honor the capacity when scheduling new tasks (Antal Sinkovits reviewed by Oliver Draese and Peter Vary)
---
 .../hadoop/hive/llap/registry/ServiceRegistry.java |   6 +
 .../llap/registry/impl/LlapFixedRegistryImpl.java  |   7 +-
 .../llap/registry/impl/LlapRegistryService.java    |  13 +-
 .../registry/impl/LlapZookeeperRegistryImpl.java   |  51 +++++---
 .../impl/TestLlapZookeeperRegistryImpl.java        | 137 +++++++++++++++++++++
 .../hadoop/hive/llap/daemon/ContainerRunner.java   |   2 +-
 .../hadoop/hive/llap/daemon/impl/LlapDaemon.java   |  11 +-
 .../llap/daemon/impl/LlapProtocolServerImpl.java   |   9 +-
 .../hive/llap/daemon/impl/TestLlapDaemon.java      | 127 +++++++++++++++++++
 .../llap/tezplugins/LlapTaskSchedulerService.java  |  85 +++++++------
 .../service/server/HS2ActivePassiveHARegistry.java |   5 +
 11 files changed, 388 insertions(+), 65 deletions(-)

diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
index 6178b4b..4f9e0da 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java
@@ -14,6 +14,7 @@
 package org.apache.hadoop.hive.llap.registry;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.hive.registry.ServiceInstance;
 import org.apache.hadoop.hive.registry.ServiceInstanceSet;
@@ -47,6 +48,11 @@ public interface ServiceRegistry<T extends ServiceInstance> {
   void unregister() throws IOException;
 
   /**
+   * Update the current registration with the given attributes.
+   */
+  void updateRegistration(Iterable<Map.Entry<String, String>> attributes) throws IOException;
+
+  /**
    * Client API to get the list of instances registered via the current registry key.
    * @param component
    * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index f99d86c..344eba7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class LlapFixedRegistryImpl implements ServiceRegistry {
+public class LlapFixedRegistryImpl implements ServiceRegistry<LlapServiceInstance> {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapFixedRegistryImpl.class);
 
@@ -112,6 +112,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
     // nothing to unregister
   }
 
+  @Override
+  public void updateRegistration(Iterable<Map.Entry<String, String>> attributes) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   public static String getWorkerIdentity(String host) {
     // trigger clean errors for anyone who mixes up identity with hosts
     return "host-" + host;
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
index 3bda40b..ea824a1 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
-import org.apache.hadoop.hive.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -34,6 +33,11 @@ import org.slf4j.LoggerFactory;
 
 public class LlapRegistryService extends AbstractService {
 
+  public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE =
+      "hive.llap.daemon.task.scheduler.enabled.wait.queue.size";
+  public static final String LLAP_DAEMON_NUM_ENABLED_EXECUTORS =
+      "hive.llap.daemon.num.enabled.executors";
+
   private static final Logger LOG = LoggerFactory.getLogger(LlapRegistryService.class);
 
   private ServiceRegistry<LlapServiceInstance> registry = null;
@@ -132,6 +136,12 @@ public class LlapRegistryService extends AbstractService {
     }
   }
 
+  public void updateRegistration(Iterable<Map.Entry<String, String>> attributes) throws IOException {
+    if (isDaemon && this.registry != null) {
+      this.registry.updateRegistration(attributes);
+    }
+  }
+
   public LlapServiceInstanceSet getInstances() throws IOException {
     return getInstances(0);
   }
@@ -158,4 +168,5 @@ public class LlapRegistryService extends AbstractService {
   public ApplicationId getApplicationId() throws IOException {
     return registry.getApplicationId();
   }
+
 }
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index f5d6202..58a99f4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -70,6 +70,7 @@ public class LlapZookeeperRegistryImpl
 
 
   private SlotZnode slotZnode;
+  private ServiceRecord daemonZkRecord;
 
   // to be used by clients of ServiceRegistry TODO: this is unnecessary
   private DynamicServiceInstanceSet instances;
@@ -124,23 +125,23 @@ public class LlapZookeeperRegistryImpl
 
   @Override
   public String register() throws IOException {
-    ServiceRecord srv = new ServiceRecord();
+    daemonZkRecord = new ServiceRecord();
     Endpoint rpcEndpoint = getRpcEndpoint();
-    srv.addInternalEndpoint(rpcEndpoint);
-    srv.addInternalEndpoint(getMngEndpoint());
-    srv.addInternalEndpoint(getShuffleEndpoint());
-    srv.addExternalEndpoint(getServicesEndpoint());
-    srv.addInternalEndpoint(getOutputFormatEndpoint());
-
-    for (Map.Entry<String, String> kv : this.conf) {
-      if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP)
-          || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
-        // TODO: read this somewhere useful, like the task scheduler
-        srv.set(kv.getKey(), kv.getValue());
-      }
-    }
-
-    String uniqueId = registerServiceRecord(srv);
+    daemonZkRecord.addInternalEndpoint(rpcEndpoint);
+    daemonZkRecord.addInternalEndpoint(getMngEndpoint());
+    daemonZkRecord.addInternalEndpoint(getShuffleEndpoint());
+    daemonZkRecord.addExternalEndpoint(getServicesEndpoint());
+    daemonZkRecord.addInternalEndpoint(getOutputFormatEndpoint());
+
+    populateConfigValues(this.conf);
+    Map<String, String> capacityValues = new HashMap<>(2);
+    capacityValues.put(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS,
+            HiveConf.getVarWithoutType(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
+    capacityValues.put(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE,
+            HiveConf.getVarWithoutType(conf, ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE));
+    populateConfigValues(capacityValues.entrySet());
+
+    String uniqueId = registerServiceRecord(daemonZkRecord);
     long znodeCreationTimeout = 120;
 
     // Create a znode under the rootNamespace parent for this instance of the server
@@ -164,6 +165,22 @@ public class LlapZookeeperRegistryImpl
     return uniqueId;
   }
 
+  private void populateConfigValues(Iterable<Map.Entry<String, String>> attributes) {
+    for (Map.Entry<String, String> kv : attributes) {
+      if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP)
+          || kv.getKey().startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
+        // TODO: read this somewhere useful, like the task scheduler
+        daemonZkRecord.set(kv.getKey(), kv.getValue());
+      }
+    }
+  }
+
+  @Override
+  public void updateRegistration(Iterable<Map.Entry<String, String>> attributes) throws IOException {
+    populateConfigValues(attributes);
+    updateServiceRecord(this.daemonZkRecord, doCheckAcls, true);
+  }
+
   @Override
   public void unregister() throws IOException {
     // Nothing for the zkCreate models
@@ -197,7 +214,7 @@ public class LlapZookeeperRegistryImpl
       this.serviceAddress =
           RegistryTypeUtils.getAddressField(services.addresses.get(0), AddressTypes.ADDRESS_URI);
       String memStr = srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, "");
-      String coreStr = srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, "");
+      String coreStr = srv.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, "");
       try {
         this.resource = Resource.newInstance(Integer.parseInt(memStr), Integer.parseInt(coreStr));
       } catch (NumberFormatException ex) {
diff --git a/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapZookeeperRegistryImpl.java b/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapZookeeperRegistryImpl.java
new file mode 100644
index 0000000..ef62b26
--- /dev/null
+++ b/llap-client/src/test/org/apache/hadoop/hive/llap/registry/impl/TestLlapZookeeperRegistryImpl.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.registry.ServiceInstanceSet;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Fields;
+import org.mockito.internal.util.reflection.InstanceField;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.Integer.parseInt;
+import static org.junit.Assert.assertEquals;
+
+public class TestLlapZookeeperRegistryImpl {
+
+  private HiveConf hiveConf = new HiveConf();
+
+  private LlapZookeeperRegistryImpl registry;
+
+  private CuratorFramework curatorFramework;
+  private TestingServer server;
+
+  @Before
+  public void setUp() throws Exception {
+    registry = new LlapZookeeperRegistryImpl("TestLlapZookeeperRegistryImpl", hiveConf);
+
+    server = new TestingServer();
+    server.start();
+
+    curatorFramework = CuratorFrameworkFactory.
+            builder().
+            connectString(server.getConnectString()).
+            sessionTimeoutMs(1000).
+            retryPolicy(new RetryOneTime(1000)).
+            build();
+    curatorFramework.start();
+
+    trySetMock(registry, CuratorFramework.class, curatorFramework);
+
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    curatorFramework.close();
+    server.stop();
+  }
+
+  @Test
+  public void testRegister() throws Exception {
+    // Given
+    int expectedExecutorCount = HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
+    int expectedQueueSize = HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE);
+
+    // When
+    registry.register();
+    ServiceInstanceSet<LlapServiceInstance> serviceInstanceSet =
+        registry.getInstances("LLAP", 1000);
+
+    // Then
+    Collection<LlapServiceInstance> llaps = serviceInstanceSet.getAll();
+    assertEquals(1, llaps.size());
+    LlapServiceInstance serviceInstance = llaps.iterator().next();
+    Map<String, String> attributes = serviceInstance.getProperties();
+
+    assertEquals(expectedQueueSize,
+        parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE)));
+    assertEquals(expectedExecutorCount,
+        parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS)));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    // Given
+    String expectedExecutorCount = "2";
+    String expectedQueueSize = "20";
+    Map<String, String> capacityValues = new HashMap<>(2);
+    capacityValues.put(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, expectedExecutorCount);
+    capacityValues.put(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE, expectedQueueSize);
+
+    // When
+    registry.register();
+    registry.updateRegistration(capacityValues.entrySet());
+    ServiceInstanceSet<LlapServiceInstance> serviceInstanceSet =
+            registry.getInstances("LLAP", 1000);
+
+    // Then
+    Collection<LlapServiceInstance> llaps = serviceInstanceSet.getAll();
+    assertEquals(1, llaps.size());
+    LlapServiceInstance serviceInstance = llaps.iterator().next();
+    Map<String, String> attributes = serviceInstance.getProperties();
+
+    assertEquals(expectedQueueSize,
+            attributes.get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE));
+    assertEquals(expectedExecutorCount,
+            attributes.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS));
+  }
+
+  static <T> void trySetMock(Object o, Class<T> clazz, T mock) {
+    List<InstanceField> instanceFields = Fields
+        .allDeclaredFieldsOf(o)
+        .filter(instanceField -> !clazz.isAssignableFrom(instanceField.jdkField().getType()))
+        .instanceFields();
+    if (instanceFields.size() != 1) {
+      throw new RuntimeException("Mocking is only supported, if only one field is assignable from the given class.");
+    }
+    InstanceField instanceField = instanceFields.get(0);
+    instanceField.set(mock);
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index 46827bc..bb79720 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -51,5 +51,5 @@ public interface ContainerRunner {
       UpdateFragmentRequestProto request) throws IOException;
 
   SetCapacityResponseProto setCapacity(
-      SetCapacityRequestProto request);
+      SetCapacityRequestProto request) throws IOException;
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index cbc5336..1b9836b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -24,7 +24,9 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -641,7 +643,14 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
 
   @Override
   public SetCapacityResponseProto setCapacity(
-      SetCapacityRequestProto request) {
+      SetCapacityRequestProto request) throws IOException {
+
+    Map<String, String> capacityValues = new HashMap<>(2);
+    capacityValues.put(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS,
+            Integer.toString(request.getExecutorNum()));
+    capacityValues.put(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE,
+            Integer.toString(request.getQueueSize()));
+    registry.updateRegistration(capacityValues.entrySet());
     return containerRunner.setCapacity(request);
   }
 
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index bb03727..e7e90d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -331,10 +331,11 @@ public class LlapProtocolServerImpl extends AbstractService
   @Override
   public LlapDaemonProtocolProtos.SetCapacityResponseProto setCapacity(final RpcController controller,
       final LlapDaemonProtocolProtos.SetCapacityRequestProto request) throws ServiceException {
-    LlapDaemonProtocolProtos.SetCapacityResponseProto.Builder responseProtoBuilder =
-        LlapDaemonProtocolProtos.SetCapacityResponseProto.newBuilder();
-    containerRunner.setCapacity(request);
-    return responseProtoBuilder.build();
+    try {
+      return containerRunner.setCapacity(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
   }
 
   private boolean determineIfSigningIsRequired(UserGroupInformation callingUser) {
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java
new file mode 100644
index 0000000..d3817e9
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemon.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
+import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.internal.util.reflection.Fields;
+import org.mockito.internal.util.reflection.InstanceField;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static java.lang.Integer.parseInt;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestLlapDaemon {
+
+  private static final String[] METRICS_SOURCES = new String[]{
+      "JvmMetrics",
+      "LlapDaemonExecutorMetrics-" + MetricsUtils.getHostName(),
+      "LlapDaemonJvmMetrics-" + MetricsUtils.getHostName(),
+      MetricsUtils.METRICS_PROCESS_NAME
+  };
+
+  private Configuration hiveConf = new HiveConf();
+
+  @Mock
+  private LlapRegistryService mockRegistry;
+
+  @Captor
+  private ArgumentCaptor<Iterable<Map.Entry<String, String>>> captor;
+
+  private LlapDaemon daemon;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+    HiveConf.setVar(hiveConf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "@llap");
+    HiveConf.setVar(hiveConf, HiveConf.ConfVars.HIVE_ZOOKEEPER_QUORUM, "localhost");
+
+    String[] localDirs = new String[1];
+    LlapDaemonInfo.initialize("testDaemon", hiveConf);
+    daemon = new LlapDaemon(hiveConf, 1, LlapDaemon.getTotalHeapSize(), false, false,
+            -1, localDirs, 0, 0, 0, -1, "TestLlapDaemon");
+  }
+
+  @After
+  public void tearDown() {
+    MetricsSystem ms = LlapMetricsSystem.instance();
+    for (String mSource : METRICS_SOURCES) {
+      ms.unregisterSource(mSource);
+    }
+    daemon.shutdown();
+  }
+
+  @Test
+  public void testUpdateRegistration() throws IOException {
+    // Given
+    int enabledExecutors = 0;
+    int enabledQueue = 2;
+    trySetMock(daemon, LlapRegistryService.class, mockRegistry);
+
+    // When
+    daemon.setCapacity(LlapDaemonProtocolProtos.SetCapacityRequestProto.newBuilder()
+        .setQueueSize(enabledQueue)
+        .setExecutorNum(enabledExecutors)
+        .build());
+    verify(mockRegistry).updateRegistration(captor.capture());
+
+    // Then
+    Map<String, String> attributes = StreamSupport.stream(captor.getValue().spliterator(), false)
+        .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+
+    assertTrue(attributes.containsKey(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS));
+    assertTrue(attributes.containsKey(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE));
+    assertEquals(enabledQueue,
+        parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE)));
+    assertEquals(enabledExecutors,
+        parseInt(attributes.get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS)));
+
+  }
+
+  static <T> void trySetMock(Object o, Class<T> clazz, T mock) {
+    List<InstanceField> instanceFields = Fields
+        .allDeclaredFieldsOf(o)
+        .filter(instanceField -> !clazz.isAssignableFrom(instanceField.jdkField().getType()))
+        .instanceFields();
+    if (instanceFields.size() != 1) {
+      throw new RuntimeException("Mocking is only supported, if only one field is assignable from the given class.");
+    }
+    InstanceField instanceField = instanceFields.get(0);
+    instanceField.set(mock);
+  }
+}
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index a97a934..2ecb7a2 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -822,6 +822,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID),
             serializedToken, jobIdForToken, 0);
       }
+
+
     } finally {
       writeLock.unlock();
     }
@@ -846,8 +848,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     @Override
     public void onUpdate(LlapServiceInstance serviceInstance, int ephSeqVersion) {
-      // Registry uses ephemeral sequential znodes that are never updated as of now.
-      LOG.warn("Unexpected update for instance={}. Ignoring", serviceInstance);
+      NodeInfo nodeInfo = instanceToNodeMap.get(serviceInstance.getWorkerIdentity());
+      nodeInfo.updateLlapServiceInstance(serviceInstance, numSchedulableTasksPerNode);
+      LOG.info("Updated node with identity: {} as a result of registry callback",
+              serviceInstance.getWorkerIdentity());
     }
 
     @Override
@@ -2482,7 +2486,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @VisibleForTesting
   static class NodeInfo implements Delayed {
     private final NodeBlacklistConf blacklistConf;
-    final LlapServiceInstance serviceInstance;
+    LlapServiceInstance serviceInstance;
     private final Clock clock;
 
     long expireTimeMillis = -1;
@@ -2498,13 +2502,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc.
     private boolean disabled = false;
 
-    private int numPreemptedTasks = 0;
     private int numScheduledTasks = 0;
-    private final int numSchedulableTasks;
+    private int numSchedulableTasks;
     private final LlapTaskSchedulerMetrics metrics;
-    private final Resource resourcePerExecutor;
+    private Resource resourcePerExecutor;
 
-    private final String shortStringBase;
+    private String shortStringBase;
 
     /**
      * Create a NodeInfo bound to a service instance
@@ -2518,36 +2521,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     NodeInfo(LlapServiceInstance serviceInstance, NodeBlacklistConf blacklistConf, Clock clock,
         int numSchedulableTasksConf, final LlapTaskSchedulerMetrics metrics) {
       Preconditions.checkArgument(numSchedulableTasksConf >= -1, "NumSchedulableTasks must be >=-1");
-      this.serviceInstance = serviceInstance;
       this.blacklistConf = blacklistConf;
       this.clock = clock;
       this.metrics = metrics;
 
-      int numVcores = serviceInstance.getResource().getVirtualCores();
-      int memoryPerInstance = serviceInstance.getResource().getMemory();
-      int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores);
-      resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1);
-
-      if (numSchedulableTasksConf == 0) {
-        int pendingQueueuCapacity = 0;
-        String pendingQueueCapacityString = serviceInstance.getProperties()
-            .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
-        LOG.info("Setting up node: {} with available capacity={}, pendingQueueSize={}, memory={}",
-            serviceInstance, serviceInstance.getResource().getVirtualCores(),
-            pendingQueueCapacityString, serviceInstance.getResource().getMemory());
-        if (pendingQueueCapacityString != null) {
-          pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
-        }
-        this.numSchedulableTasks = numVcores + pendingQueueuCapacity;
-      } else {
-        this.numSchedulableTasks = numSchedulableTasksConf;
-        LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks);
-      }
-      if (metrics != null) {
-        metrics.incrSchedulableTasksCount(numSchedulableTasks);
-      }
-      shortStringBase = setupShortStringBase();
-
+      updateLlapServiceInstance(serviceInstance, numSchedulableTasksConf);
     }
 
     String getNodeIdentity() {
@@ -2570,6 +2548,40 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       return resourcePerExecutor;
     }
 
+    void updateLlapServiceInstance(LlapServiceInstance serviceInstance, int numSchedulableTasksConf) {
+      this.serviceInstance = serviceInstance;
+
+      int numVcores = serviceInstance.getResource().getVirtualCores();
+      int memoryPerInstance = serviceInstance.getResource().getMemory();
+      int memoryPerExecutor = (int)(memoryPerInstance / (double) numVcores);
+      resourcePerExecutor = Resource.newInstance(memoryPerExecutor, 1);
+
+      int oldNumSchedulableTasks = numSchedulableTasks;
+      if (numSchedulableTasksConf == 0) {
+        int pendingQueueuCapacity = 0;
+        String pendingQueueCapacityString = serviceInstance.getProperties()
+                .get(LlapRegistryService.LLAP_DAEMON_TASK_SCHEDULER_ENABLED_WAIT_QUEUE_SIZE);
+        if (pendingQueueCapacityString == null) {
+          pendingQueueCapacityString = serviceInstance.getProperties()
+                  .get(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname);
+        }
+        LOG.info("Setting up node: {} with available capacity={}, pendingQueueSize={}, memory={}",
+                serviceInstance, serviceInstance.getResource().getVirtualCores(),
+                pendingQueueCapacityString, serviceInstance.getResource().getMemory());
+        if (pendingQueueCapacityString != null) {
+          pendingQueueuCapacity = Integer.parseInt(pendingQueueCapacityString);
+        }
+        this.numSchedulableTasks = numVcores + pendingQueueuCapacity;
+      } else {
+        this.numSchedulableTasks = numSchedulableTasksConf;
+        LOG.info("Setting up node: " + serviceInstance + " with schedulableCapacity=" + this.numSchedulableTasks);
+      }
+      if (metrics != null) {
+        metrics.incrSchedulableTasksCount(numSchedulableTasks - oldNumSchedulableTasks);
+      }
+      shortStringBase = setupShortStringBase();
+    }
+
     void resetExpireInformation() {
       expireTimeMillis = -1;
       hadCommFailure = false;
@@ -2633,7 +2645,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         metrics.incrSchedulableTasksCount();
       }
       if (wasPreempted) {
-        numPreemptedTasks++;
         if (metrics != null) {
           metrics.incrPreemptedTasksCount();
         }
@@ -2660,7 +2671,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
     }
 
-    int canAcceptCounter = 0;
     /* Returning true does not guarantee that the task will run, considering other queries
     may be running in the system. Also depends upon the capacity usage configuration
      */
@@ -2669,11 +2679,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       if (LOG.isTraceEnabled()) {
         LOG.trace(constructCanAcceptLogResult(result));
       }
-      if (canAcceptCounter == 10000) {
-        canAcceptCounter++;
-        LOG.info(constructCanAcceptLogResult(result));
-        canAcceptCounter = 0;
-      }
       return result;
     }
 
diff --git a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
index f4b4362..ea5965d 100644
--- a/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
+++ b/service/src/java/org/apache/hive/service/server/HS2ActivePassiveHARegistry.java
@@ -140,6 +140,11 @@ public class HS2ActivePassiveHARegistry extends ZkRegistryBase<HiveServer2Instan
     unregisterInternal();
   }
 
+  @Override
+  public void updateRegistration(Iterable<Map.Entry<String, String>> attributes) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   private void populateCache() throws IOException {
     PathChildrenCache pcc = ensureInstancesCache(0);
     populateCache(pcc, false);