You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/07/16 08:52:43 UTC
[hive] branch master updated: HIVE-21988: Do not consider nodes
with 0 capacity when calculating host affinity (Peter Vary reviewed by
Oliver Draese and Adam Szita)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5676788 HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita)
5676788 is described below
commit 5676788893f264e6f42435100f6f25ba2b6d28b7
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Tue Jul 16 10:51:28 2019 +0200
HIVE-21988: Do not consider nodes with 0 capacity when calculating host affinity (Peter Vary reviewed by Oliver Draese and Adam Szita)
---
.../registry/impl/InactiveServiceInstance.java | 3 +-
.../llap/registry/impl/LlapFixedRegistryImpl.java | 8 +-
.../registry/impl/LlapZookeeperRegistryImpl.java | 8 +-
.../tez/HostAffinitySplitLocationProvider.java | 4 +-
.../org/apache/hadoop/hive/ql/exec/tez/Utils.java | 49 ++++---
.../tez/TestHostAffinitySplitLocationProvider.java | 4 +-
.../apache/hadoop/hive/ql/exec/tez/TestUtils.java | 157 +++++++++++++++++++++
7 files changed, 210 insertions(+), 23 deletions(-)
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
index 1d6b716..d9c2364 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.llap.registry.impl;
+import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
@@ -62,7 +63,7 @@ public class InactiveServiceInstance implements LlapServiceInstance {
@Override
public Map<String, String> getProperties() {
- throw new UnsupportedOperationException();
+ return Collections.emptyMap();
}
@Override
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 344eba7..2bedb32 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -29,6 +29,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -122,7 +124,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry<LlapServiceInstanc
return "host-" + host;
}
- private final class FixedServiceInstance implements LlapServiceInstance {
+ /**
+ * A single instance in an Llap Service.
+ */
+ @VisibleForTesting
+ public final class FixedServiceInstance implements LlapServiceInstance {
private final String host;
private final String serviceAddress;
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 58a99f4..bf7b76b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -13,6 +13,7 @@
*/
package org.apache.hadoop.hive.llap.registry.impl;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import com.google.common.collect.Sets;
@@ -186,7 +187,12 @@ public class LlapZookeeperRegistryImpl
// Nothing for the zkCreate models
}
- private class DynamicServiceInstance
+ /**
+ * A dynamically changing instance in an Llap Service. Can become inactive if failing or can be
+ * blacklisted (set to 0 capacity) if too slow (See: BlacklistingLlapMetricsListener).
+ */
+ @VisibleForTesting
+ public class DynamicServiceInstance
extends ServiceInstanceBase implements LlapServiceInstance {
private final int mngPort;
private final int shufflePort;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index c5d96e5..a1d422b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -44,9 +44,9 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
private final static Logger LOG = LoggerFactory.getLogger(
HostAffinitySplitLocationProvider.class);
- private final boolean isDebugEnabled = LOG.isDebugEnabled();
- private final List<String> locations;
+ @VisibleForTesting
+ final List<String> locations;
public HostAffinitySplitLocationProvider(List<String> knownLocations) {
Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 1b7321b..db1a0e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -21,12 +21,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
-import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -50,21 +50,7 @@ public class Utils {
LOG.info("SplitGenerator using llap affinitized locations: " + useCustomLocations);
if (useCustomLocations) {
LlapRegistryService serviceRegistry = LlapRegistryService.getClient(conf);
- LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
-
- Collection<LlapServiceInstance> serviceInstances =
- serviceRegistry.getInstances().getAllInstancesOrdered(true);
- Preconditions.checkArgument(!serviceInstances.isEmpty(),
- "No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
- ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
- for (LlapServiceInstance serviceInstance : serviceInstances) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
- serviceInstance.getHost() + " to list for split locations");
- }
- locations.add(serviceInstance.getHost());
- }
- splitLocationProvider = new HostAffinitySplitLocationProvider(locations);
+ return getCustomSplitLocationProvider(serviceRegistry, LOG);
} else {
splitLocationProvider = new SplitLocationProvider() {
@Override
@@ -84,4 +70,35 @@ public class Utils {
}
return splitLocationProvider;
}
+
+ @VisibleForTesting
+ static SplitLocationProvider getCustomSplitLocationProvider(LlapRegistryService serviceRegistry, Logger LOG) throws
+ IOException {
+ LOG.info("Using LLAP instance " + serviceRegistry.getApplicationId());
+
+ Collection<LlapServiceInstance> serviceInstances =
+ serviceRegistry.getInstances().getAllInstancesOrdered(true);
+ Preconditions.checkArgument(!serviceInstances.isEmpty(),
+ "No running LLAP daemons! Please check LLAP service status and zookeeper configuration");
+ ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
+ for (LlapServiceInstance serviceInstance : serviceInstances) {
+ String executors =
+ serviceInstance.getProperties().get(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS);
+ if (executors != null && Integer.parseInt(executors) == 0) {
+ // If the executors set to 0 we should not consider this location for affinity
+ locations.add(null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
+ serviceInstance.getHost() + " since executor number is 0");
+ }
+ } else {
+ locations.add(serviceInstance.getHost());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
+ serviceInstance.getHost() + " to list for split locations");
+ }
+ }
+ }
+ return new HostAffinitySplitLocationProvider(locations);
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 13f4676..61c98b7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -306,8 +306,8 @@ public class TestHostAffinitySplitLocationProvider {
return inputSplit;
}
- private FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
- long length, String[] locations) throws IOException {
+ static FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
+ long length, String[] locations) throws IOException {
FileSplit fileSplit;
if (createOrcSplit) {
fileSplit = mock(OrcSplit.class);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java
new file mode 100644
index 0000000..b001d02
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstance;
+import org.apache.hadoop.hive.llap.registry.LlapServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance;
+import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.registry.impl.LlapZookeeperRegistryImpl;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.split.SplitLocationProvider;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+/**
+ * Test class for Utils methods.
+ */
+public class TestUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class);
+
+ private static final String INACTIVE = "inactive";
+ private static final String ACTIVE = "dynamic";
+ private static final String DISABLED = "disabled";
+ private static final String FIXED = "fix";
+
+
+ @Mock
+ private LlapRegistryService mockRegistry;
+
+ @Mock
+ private LlapServiceInstanceSet mockInstanceSet;
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ }
+
+ @Test
+ public void testGetSplitLocationProvider() throws IOException, URISyntaxException {
+ // Create test LlapServiceInstances to make sure that we can handle all of the instance types
+ List<LlapServiceInstance> instances = new ArrayList<>(3);
+
+ // Set 1 inactive instance to make sure that this does not cause problem for us
+ LlapServiceInstance inactive = new InactiveServiceInstance(INACTIVE);
+ instances.add(inactive);
+
+ LlapZookeeperRegistryImpl dynRegistry = new LlapZookeeperRegistryImpl("dyn", new HiveConf());
+ Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint("llap", new InetSocketAddress(ACTIVE, 4000));
+ Endpoint shuffle = RegistryTypeUtils.ipcEndpoint("shuffle", new InetSocketAddress(ACTIVE, 4000));
+ Endpoint mng = RegistryTypeUtils.ipcEndpoint("llapmng", new InetSocketAddress(ACTIVE, 4000));
+ Endpoint outputFormat = RegistryTypeUtils.ipcEndpoint("llapoutputformat", new InetSocketAddress(ACTIVE, 4000));
+ Endpoint services = RegistryTypeUtils.webEndpoint("services", new URI(ACTIVE + ":4000"));
+
+ // Set 1 active instance
+ ServiceRecord enabledSrv = new ServiceRecord();
+ enabledSrv.addInternalEndpoint(rpcEndpoint);
+ enabledSrv.addInternalEndpoint(shuffle);
+ enabledSrv.addInternalEndpoint(mng);
+ enabledSrv.addInternalEndpoint(outputFormat);
+ enabledSrv.addExternalEndpoint(services);
+
+ enabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 10);
+ enabledSrv.set(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, 100);
+ LlapZookeeperRegistryImpl.DynamicServiceInstance dynamic =
+ dynRegistry.new DynamicServiceInstance(enabledSrv);
+ instances.add(dynamic);
+
+ // Set 1 instance with 0 executors
+ ServiceRecord disabledSrv = new ServiceRecord(enabledSrv);
+ disabledSrv.set(LlapRegistryService.LLAP_DAEMON_NUM_ENABLED_EXECUTORS, 0);
+ LlapZookeeperRegistryImpl.DynamicServiceInstance disabled =
+ dynRegistry.new DynamicServiceInstance(disabledSrv);
+ disabled.setHost(DISABLED);
+ instances.add(disabled);
+
+ when(mockRegistry.getInstances()).thenReturn(mockInstanceSet);
+ when(mockInstanceSet.getAllInstancesOrdered(anyBoolean())).thenReturn(instances);
+ SplitLocationProvider provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG);
+
+ assertLocations((HostAffinitySplitLocationProvider)provider, new String[] {ACTIVE});
+
+ // Check if fixed stuff is working as well
+ LlapFixedRegistryImpl fixRegistry = new LlapFixedRegistryImpl("llap", new HiveConf());
+
+ // Instance for testing fixed registry instances
+ LlapServiceInstance fixed = fixRegistry.new FixedServiceInstance(FIXED);
+ instances.remove(dynamic);
+ instances.add(fixed);
+
+ provider = Utils.getCustomSplitLocationProvider(mockRegistry, LOG);
+
+ assertLocations((HostAffinitySplitLocationProvider)provider, new String[] {FIXED});
+ }
+
+ private void assertLocations(HostAffinitySplitLocationProvider provider, String[] expectedLocations)
+ throws IOException {
+ InputSplit inputSplit1 =
+ TestHostAffinitySplitLocationProvider.createMockFileSplit(
+ true, "path2", 0, 1000, new String[] {"HOST-1", "HOST-2"});
+
+ // Check that the provider does not return disabled/inactive instances and returns onl 1 location
+ List<String> result = new ArrayList<>(Arrays.asList(provider.getLocations(inputSplit1)));
+ assertEquals(1, result.size());
+ assertFalse(result.contains(INACTIVE));
+ assertFalse(result.contains(DISABLED));
+
+ // Since we can not check the results for every input, dig into the provider internal data to
+ // make sure that we have only the available host name in the location list
+ // Remove nulls
+ Set<String> knownLocations = new HashSet<>();
+ knownLocations.addAll(provider.locations);
+ knownLocations.remove(null);
+ assertArrayEquals(expectedLocations, knownLocations.toArray(new String[] {}));
+ }
+}