You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/16 08:52:43 UTC

[hive] branch master updated: HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5676788  HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita)
5676788 is described below

commit 5676788893f264e6f42435100f6f25ba2b6d28b7
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Tue Jul 16 10:51:28 2019 +0200

    HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita)
---
 .../registry/impl/InactiveServiceInstance.java     |   3 +-
 .../llap/registry/impl/LlapFixedRegistryImpl.java  |   8 +-
 .../registry/impl/LlapZookeeperRegistryImpl.java   |   8 +-
 .../tez/HostAffinitySplitLocationProvider.java     |   4 +-
 .../org/apache/hadoop/hive/ql/exec/tez/Utils.java  |  49 ++++---
 .../tez/TestHostAffinitySplitLocationProvider.java |   4 +-
 .../apache/hadoop/hive/ql/exec/tez/TestUtils.java  | 157 +++++++++++++++++++++
 7 files changed, 210 insertions(+), 23 deletions(-)

diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
index 1d6b716..d9c2364 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -14,6 +14,7 @@
 
 package org.apache.hadoop.hive.llap.registry.impl;
 
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
@@ -62,7 +63,7 @@ public class InactiveServiceInstance implements LlapServiceInstance {
 
   @Override
   public Map<String, String> getProperties() {
-    throw new UnsupportedOperationException();
+    return Collections.emptyMap();
   }
 
   @Override
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 344eba7..2bedb32 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -29,6 +29,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -122,7 +124,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry<LlapServiceInstanc
     return "host-" + host;
   }
 
-  private final class FixedServiceInstance implements LlapServiceInstance {
+  /**
+   * A single instance in an Llap Service.
+   */
+  @VisibleForTesting
+  public final class FixedServiceInstance implements LlapServiceInstance {
 
     private final String host;
     private final String serviceAddress;
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 58a99f4..bf7b76b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -13,6 +13,7 @@
  */
 package org.apache.hadoop.hive.llap.registry.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 
 import com.google.common.collect.Sets;
@@ -186,7 +187,12 @@ public class LlapZookeeperRegistryImpl
     // Nothing for the zkCreate models
   }
 
-  private class DynamicServiceInstance
+  /**
+   * A dynamically changing instance in an Llap Service. Can become inactive if failing or can be
+   * blacklisted (set to 0 capacity) if too slow (See: BlacklistingLlapMetricsListener).
+   */
+  @VisibleForTesting
+  public class DynamicServiceInstance
       extends ServiceInstanceBase implements LlapServiceInstance {
     private final int mngPort;
     private final int shufflePort;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index c5d96e5..a1d422b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -44,9 +44,9 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
 
   private final static Logger LOG = LoggerFactory.getLogger(
       HostAffinitySplitLocationProvider.class);
-  private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
-  private final List<String> locations;
+  @VisibleForTesting
+  final List<String> locations;
 
   public HostAffinitySplitLocationProvider(List<String> knownLocations) {
     Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 1b7321b..db1a0e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -21,12 +21,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -50,21 +50,7 @@ public class Utils {
     LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations);
     if (useCustomLocations) {
       LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf);
-      LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
-
-      Collection<LlapServiceInstance> serviceInstances =
-        serviceRegistry.getInstances().getAllInstancesOrdered(true);
-      Preconditions.checkArgument(!serviceInstances.isEmpty(),
-          "No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
-      ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
-      for (LlapServiceInstance serviceInstance : serviceInstances) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
-              serviceInstance.getHost() + " to list for split locations");
-        }
-        locations.add(serviceInstance.getHost());
-      }
-      splitLocationProvider = new HostAffinitySplitLocationProvider(locations);
+      return getCustomSplitLocationProvider(serviceRegistry, LOG);
     } else {
       splitLocationProvider = new SplitLocationProvider() {
         @Override
@@ -84,4 +70,35 @@ public class Utils {
     }
     return splitLocationProvider;
   }
+
+  @VisibleForTesting
+  static SplitLocationProvider getCustomSplitLocationProvider(LlapRegistryService serviceRegistry, Logger LOG) throws
+      IOException {
+    LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
+
+    Collection<LlapServiceInstance> serviceInstances =
+        serviceRegistry.getInstances().getAllInstancesOrdered(true);
+    Preconditions.checkArgument(!serviceInstances.isEmpty(),
+        "No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
+    ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
+    for (LlapServiceInstance serviceInstance : serviceInstances) {
+      String executors =
+          serviceInstance.getProperties().get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS);
+      if (executors != null && Integer.parseInt(executors) == 0) {
+        // If the executors set to 0 we should not consider this location for affinity
+        locations.add(null);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Not adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
+                        serviceInstance.getHost() + " since executor number is 0");
+        }
+      } else {
+        locations.add(serviceInstance.getHost());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
+                        serviceInstance.getHost() + " to list for split locations");
+        }
+      }
+    }
+    return new HostAffinitySplitLocationProvider(locations);
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 13f4676..61c98b7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -306,8 +306,8 @@ public class TestHostAffinitySplitLocationProvider {
     return inputSplit;
   }
 
-  private FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
-                                         long length, String[] locations) throws IOException {
+  static FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
+                                       long length, String[] locations) throws IOException {
     FileSplit fileSplit;
     if (createOrcSplit) {
       fileSplit = mock(OrcSplit.class);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java
new file mode 100644
index 0000000..b001d02
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+/**
+ * Test class for Utils methods.
+ */
+public class TestUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
+
+  private static final String INACTIVE = "inactive";
+  private static final String ACTIVE = "dynamic";
+  private static final String DISABLED = "disabled";
+  private static final String FIXED = "fix";
+
+
+  @Mock
+  private LlapRegistryService mockRegistry;
+
+  @Mock
+  private LlapServiceInstanceSet mockInstanceSet;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+  }
+
+  @Test
+  public void testGetSplitLocationProvider() throws IOException, URISyntaxException {
+    // Create test LlapServiceInstances to make sure that we can handle all of the instance types
+    List<LlapServiceInstance> instances = new ArrayList<>(3);
+
+    // Set 1 inactive instance to make sure that this does not cause problem for us
+    LlapServiceInstance inactive = new InactiveServiceInstance(INACTIVE);
+    instances.add(inactive);
+
+    LlapZookeeperRegistryImpl dynRegistry = new LlapZookeeperRegistryImpl("dyn", new HiveConf());
+    Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(ACTIVE, 4000));
+    Endpoint shuffle = RegistryTypeUtils.ipcEndpoint("shuffle", new InetSocketAddress(ACTIVE, 4000));
+    Endpoint mng = RegistryTypeUtils.ipcEndpoint("llapmng", new InetSocketAddress(ACTIVE, 4000));
+    Endpoint outputFormat = RegistryTypeUtils.ipcEndpoint("llapoutputformat", new InetSocketAddress(ACTIVE, 4000));
+    Endpoint services = RegistryTypeUtils.webEndpoint("services", new URI(ACTIVE + ":4000"));
+
+    // Set 1 active instance
+    ServiceRecord enabledSrv = new ServiceRecord();
+    enabledSrv.addInternalEndpoint(rpcEndpoint);
+    enabledSrv.addInternalEndpoint(shuffle);
+    enabledSrv.addInternalEndpoint(mng);
+    enabledSrv.addInternalEndpoint(outputFormat);
+    enabledSrv.addExternalEndpoint(services);
+
+    enabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 10);
+    enabledSrv.set(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, 100);
+    LlapZookeeperRegistryImpl.DynamicServiceInstance dynamic =
+        dynRegistry.new DynamicServiceInstance(enabledSrv);
+    instances.add(dynamic);
+
+    // Set 1 instance with 0 executors
+    ServiceRecord disabledSrv = new ServiceRecord(enabledSrv);
+    disabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 0);
+    LlapZookeeperRegistryImpl.DynamicServiceInstance disabled =
+        dynRegistry.new DynamicServiceInstance(disabledSrv);
+    disabled.setHost(DISABLED);
+    instances.add(disabled);
+
+    when(mockRegistry.getInstances()).thenReturn(mockInstanceSet);
+    when(mockInstanceSet.getAllInstancesOrdered(anyBoolean())).thenReturn(instances);
+    SplitLocationProvider provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG);
+
+    assertLocations((HostAffinitySplitLocationProvider)provider, new String[] {ACTIVE});
+
+    // Check if fixed stuff is working as well
+    LlapFixedRegistryImpl fixRegistry = new LlapFixedRegistryImpl("llap", new HiveConf());
+
+    // Instance for testing fixed registry instances
+    LlapServiceInstance fixed = fixRegistry.new FixedServiceInstance(FIXED);
+    instances.remove(dynamic);
+    instances.add(fixed);
+
+    provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG);
+
+    assertLocations((HostAffinitySplitLocationProvider)provider, new String[] {FIXED});
+  }
+
+  private void assertLocations(HostAffinitySplitLocationProvider provider, String[] expectedLocations)
+      throws IOException {
+    InputSplit inputSplit1 =
+        TestHostAffinitySplitLocationProvider.createMockFileSplit(
+            true, "path2", 0, 1000, new String[]  {"HOST-1", "HOST-2"});
+
+    // Check that the provider does not return disabled/inactive instances and returns onl 1 location
+    List<String> result = new ArrayList<>(Arrays.asList(provider.getLocations(inputSplit1)));
+    assertEquals(1, result.size());
+    assertFalse(result.contains(INACTIVE));
+    assertFalse(result.contains(DISABLED));
+
+    // Since we can not check the results for every input, dig into the provider internal data to
+    // make sure that we have only the available host name in the location list
+    // Remove nulls
+    Set<String> knownLocations = new HashSet<>();
+    knownLocations.addAll(provider.locations);
+    knownLocations.remove(null);
+    assertArrayEquals(expectedLocations, knownLocations.toArray(new String[] {}));
+  }
+}