You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zt...@apache.org on 2019/04/23 09:38:54 UTC
[hadoop] branch trunk updated: YARN-9475. [YARN-9473] Create basic
VE plugin. Contributed by Peter Bacsko.
This is an automated email from the ASF dual-hosted git repository.
ztang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a95ea6 YARN-9475. [YARN-9473] Create basic VE plugin. Contributed by Peter Bacsko.
8a95ea6 is described below
commit 8a95ea61e12384389f2103df0fcba594469cc024
Author: Zhankun Tang <zt...@apache.org>
AuthorDate: Tue Apr 23 17:33:58 2019 +0800
YARN-9475. [YARN-9473] Create basic VE plugin. Contributed by Peter Bacsko.
---
.../resourceplugin/com/nec/NECVEPlugin.java | 306 +++++++++++++++++++++
.../resourceplugin/com/nec/package-info.java | 19 ++
2 files changed, 325 insertions(+)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java
new file mode 100644
index 0000000..d226237
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/NECVEPlugin.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Shell.CommandExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePluginScheduler;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec;
+import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.YarnRuntimeType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A device framework plugin which supports NEC Vector Engine.
+ *
+ */
+public class NECVEPlugin implements DevicePlugin, DevicePluginScheduler {
+ private static final String HADOOP_COMMON_HOME = "HADOOP_COMMON_HOME";
+ private static final String ENV_SCRIPT_PATH = "NEC_VE_GET_SCRIPT_PATH";
+ private static final String ENV_SCRIPT_NAME = "NEC_VE_GET_SCRIPT_NAME";
+ private static final String DEFAULT_SCRIPT_NAME = "nec-ve-get.py";
+ private static final Logger LOG = LoggerFactory.getLogger(NECVEPlugin.class);
+ private static final String[] DEFAULT_BINARY_SEARCH_DIRS = new String[]{
+ "/usr/bin", "/bin", "/opt/nec/ve/bin"};
+
+ private String binaryPath;
+
+ private Function<String[], CommandExecutor>
+ commandExecutorProvider = this::createCommandExecutor;
+
+ public NECVEPlugin() throws ResourceHandlerException {
+ this(System::getenv, DEFAULT_BINARY_SEARCH_DIRS);
+ }
+
+ @VisibleForTesting
+ NECVEPlugin(Function<String, String> envProvider, String[] scriptPaths)
+ throws ResourceHandlerException {
+ String binaryName = DEFAULT_SCRIPT_NAME;
+
+ String envScriptName = envProvider.apply(ENV_SCRIPT_NAME);
+ if (envScriptName != null) {
+ binaryName = envScriptName;
+ }
+ LOG.info("Use {} as script name.", envScriptName);
+
+ // Try to find the script based on an environment variable, if set
+ boolean found = false;
+ String envBinaryPath = envProvider.apply(ENV_SCRIPT_PATH);
+ if (envBinaryPath != null) {
+ this.binaryPath = getScriptFromEnvSetting(envBinaryPath);
+ found = binaryPath != null;
+ }
+
+ // Try $HADOOP_COMMON_HOME
+ if (!found) {
+ // print a warning only if the env variable was defined
+ if (envBinaryPath != null) {
+ LOG.warn("Script {} does not exist, falling back " +
+ "to $HADOOP_COMMON_HOME/sbin/DevicePluginScript/", envBinaryPath);
+ }
+
+ this.binaryPath = getScriptFromHadoopCommon(envProvider, binaryName);
+ found = binaryPath != null;
+ }
+
+ // Try the default search directories
+ if (!found) {
+ LOG.info("Script not found under" +
+ " $HADOOP_COMMON_HOME/sbin/DevicePluginScript/," +
+ " falling back to default search directories");
+
+ this.binaryPath = getScriptFromSearchDirs(binaryName, scriptPaths);
+ found = binaryPath != null;
+ }
+
+ // Script not found
+ if (!found) {
+ LOG.error("Script not found in "
+ + Arrays.toString(scriptPaths));
+ throw new ResourceHandlerException(
+ "No binary found for " + NECVEPlugin.class.getName());
+ }
+ }
+
+ public DeviceRegisterRequest getRegisterRequestInfo() {
+ return DeviceRegisterRequest.Builder.newInstance()
+ .setResourceName("nec.com/ve").build();
+ }
+
+ public Set<Device> getDevices() {
+ Set<Device> devices = null;
+
+ CommandExecutor executor =
+ commandExecutorProvider.apply(new String[]{this.binaryPath});
+ try {
+ executor.execute();
+ String output = executor.getOutput();
+ devices = parseOutput(output);
+ } catch (IOException e) {
+ LOG.warn(e.toString());
+ }
+ return devices;
+ }
+
+ public DeviceRuntimeSpec onDevicesAllocated(Set<Device> set,
+ YarnRuntimeType yarnRuntimeType) {
+ return null;
+ }
+
+ /**
+ * Parses the output of the external Python script.
+ *
+ * Sample line:
+ * id=0, dev=/dev/ve0, state=ONLINE, busId=0000:65:00.0, major=243, minor=0
+ */
+ private Set<Device> parseOutput(String output) {
+ Set<Device> devices = new HashSet<>();
+
+ LOG.info("Parsing output: {}", output);
+ String[] lines = output.split("\n");
+ for (String line : lines) {
+ Device.Builder builder = Device.Builder.newInstance();
+
+ // map key --> builder calls
+ Map<String, Consumer<String>> builderInvocations =
+ getBuilderInvocationsMap(builder);
+
+ String[] keyValues = line.trim().split(",");
+ for (String keyValue : keyValues) {
+ String[] tokens = keyValue.trim().split("=");
+ if (tokens.length != 2) {
+ LOG.error("Unknown format of script output! Skipping this line");
+ continue;
+ }
+
+ final String key = tokens[0];
+ final String value = tokens[1];
+
+ Consumer<String> builderInvocation = builderInvocations.get(key);
+ if (builderInvocation != null) {
+ builderInvocation.accept(value);
+ } else {
+ LOG.warn("Unknown key {}, ignored", key);
+ }
+ }// for key value pairs
+ Device device = builder.build();
+ if (device.isHealthy()) {
+ devices.add(device);
+ } else {
+ LOG.warn("Skipping device {} because it's not healthy", device);
+ }
+ }
+
+ return devices;
+ }
+
+ @Override
+ public void onDevicesReleased(Set<Device> releasedDevices) {
+ // nop
+ }
+
+ @Override
+ public Set<Device> allocateDevices(Set<Device> availableDevices, int count,
+ Map<String, String> env) {
+ // Can consider topology, utilization.etc
+ Set<Device> allocated = new HashSet<>();
+ int number = 0;
+ for (Device d : availableDevices) {
+ allocated.add(d);
+ number++;
+ if (number == count) {
+ break;
+ }
+ }
+ return allocated;
+ }
+
+ private CommandExecutor createCommandExecutor(String[] command) {
+ return new Shell.ShellCommandExecutor(
+ command);
+ }
+
+ private String getScriptFromEnvSetting(String envBinaryPath) {
+ LOG.info("Checking script path: {}", envBinaryPath);
+ File f = new File(envBinaryPath);
+
+ if (!f.exists()) {
+ LOG.warn("Script {} does not exist", envBinaryPath);
+ return null;
+ }
+
+ if (f.isDirectory()) {
+ LOG.warn("Specified path {} is a directory", envBinaryPath);
+ return null;
+ }
+
+ if (!FileUtil.canExecute(f)) {
+ LOG.warn("Script {} is not executable", envBinaryPath);
+ return null;
+ }
+
+ LOG.info("Found script: {}", envBinaryPath);
+
+ return envBinaryPath;
+ }
+
+ private String getScriptFromHadoopCommon(
+ Function<String, String> envProvider, String binaryName) {
+ String scriptPath = null;
+ String hadoopCommon = envProvider.apply(HADOOP_COMMON_HOME);
+
+ if (hadoopCommon != null) {
+ String targetPath = hadoopCommon +
+ "/sbin/DevicePluginScript/" + binaryName;
+ LOG.info("Checking script {}: ", targetPath);
+ if (new File(targetPath).exists()) {
+ LOG.info("Found script: {}", targetPath);
+ scriptPath = targetPath;
+ }
+ } else {
+ LOG.info("$HADOOP_COMMON_HOME is not set");
+ }
+
+ return scriptPath;
+ }
+
+ private String getScriptFromSearchDirs(String binaryName,
+ String[] scriptPaths) {
+ String scriptPath = null;
+
+ for (String dir : scriptPaths) {
+ File f = new File(dir, binaryName);
+ if (f.exists()) {
+ LOG.info("Found script: {}", dir);
+ scriptPath = f.getAbsolutePath();
+ break;
+ }
+ }
+
+ return scriptPath;
+ }
+
+ private Map<String, Consumer<String>> getBuilderInvocationsMap(
+ Device.Builder builder) {
+ Map<String, Consumer<String>> builderInvocations = new HashMap<>();
+ builderInvocations.put("id", v -> builder.setId(Integer.parseInt(v)));
+ builderInvocations.put("dev", v -> builder.setDevPath(v));
+ builderInvocations.put("state", v -> {
+ if (v.equals("ONLINE")) {
+ builder.setHealthy(true);
+ }
+ builder.setStatus(v);
+ });
+ builderInvocations.put("busId", v -> builder.setBusID(v));
+ builderInvocations.put("major",
+ v -> builder.setMajorNumber(Integer.parseInt(v)));
+ builderInvocations.put("minor",
+ v -> builder.setMinorNumber(Integer.parseInt(v)));
+
+ return builderInvocations;
+ }
+
+ @VisibleForTesting
+ void setCommandExecutorProvider(
+ Function<String[], CommandExecutor> provider) {
+ this.commandExecutorProvider = provider;
+ }
+
+ @VisibleForTesting
+ String getBinaryPath() {
+ return binaryPath;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java
new file mode 100644
index 0000000..8f7fd67
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/com/nec/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.com.nec;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org