You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/12/01 18:55:33 UTC
[1/2] hadoop git commit: YARN-6507. Add support in NodeManager to
isolate FPGA devices with CGroups. (Zhankun Tang via wangda)
Repository: hadoop
Updated Branches:
refs/heads/trunk 5304698dc -> 7225ec0ce
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java
new file mode 100644
index 0000000..d3d55fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/TestFpgaResourceHandler.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceSet;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestFpgaResourceHandler {
+ private Context mockContext;
+ private FpgaResourceHandlerImpl fpgaResourceHandler;
+ private Configuration configuration;
+ private CGroupsHandler mockCGroupsHandler;
+ private PrivilegedOperationExecutor mockPrivilegedExecutor;
+ private NMStateStoreService mockNMStateStore;
+ private ConcurrentHashMap<ContainerId, Container> runningContainersMap;
+ private IntelFpgaOpenclPlugin mockVendorPlugin;
+ private static final String vendorType = "IntelOpenCL";
+
+ @Before
+ public void setup() {
+ TestResourceUtils.addNewTypesToResources(ResourceInformation.FPGA_URI);
+ configuration = new YarnConfiguration();
+
+ mockCGroupsHandler = mock(CGroupsHandler.class);
+ mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class);
+ mockNMStateStore = mock(NMStateStoreService.class);
+ mockContext = mock(Context.class);
+ // Assumed devices parsed from output
+ List<FpgaResourceAllocator.FpgaDevice> list = new ArrayList<>();
+ list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
+ list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
+ list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
+ list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 3, null));
+ list.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 4, null));
+ mockVendorPlugin = mockPlugin(vendorType, list);
+ FpgaDiscoverer.getInstance().setConf(configuration);
+ when(mockContext.getNMStateStore()).thenReturn(mockNMStateStore);
+ runningContainersMap = new ConcurrentHashMap<>();
+ when(mockContext.getContainers()).thenReturn(runningContainersMap);
+
+ fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
+ mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
+ }
+
+ @Test
+ public void testBootstrap() throws ResourceHandlerException {
+ // Case 1. auto
+ String allowed = "auto";
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
+ fpgaResourceHandler.bootstrap(configuration);
+ verify(mockVendorPlugin, times(1)).initPlugin(configuration);
+ verify(mockCGroupsHandler, times(1)).initializeCGroupController(
+ CGroupsHandler.CGroupController.DEVICES);
+ Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ Assert.assertEquals(5, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
+ // Case 2. subset of devices
+ fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
+ mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
+ allowed = "0,1,2";
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
+ fpgaResourceHandler.bootstrap(configuration);
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
+ List<FpgaResourceAllocator.FpgaDevice> allowedDevices = fpgaResourceHandler.getFpgaAllocator().getAllowedFpga();
+ for (String s : allowed.split(",")) {
+ boolean check = false;
+ for (FpgaResourceAllocator.FpgaDevice device : allowedDevices) {
+ if (device.getMinor().toString().equals(s)) {
+ check = true;
+ }
+ }
+ Assert.assertTrue("Minor:" + s +"found", check);
+ }
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+
+ // Case 3. User configuration contains invalid minor device number
+ fpgaResourceHandler = new FpgaResourceHandlerImpl(mockContext,
+ mockCGroupsHandler, mockPrivilegedExecutor, mockVendorPlugin);
+ allowed = "0,1,7";
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
+ fpgaResourceHandler.bootstrap(configuration);
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
+ }
+
+ @Test
+ public void testBootstrapWithInvalidUserConfiguration() throws ResourceHandlerException {
+ // User configuration contains invalid minor device number
+ String allowed = "0,1,7";
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
+ fpgaResourceHandler.bootstrap(configuration);
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAllowedFpga().size());
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+
+ String[] invalidAllowedStrings = {"a,1,2,", "a,1,2", "0,1,2,#", "a", "1,"};
+ for (String s : invalidAllowedStrings) {
+ boolean invalidConfiguration = false;
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, s);
+ try {
+ fpgaResourceHandler.bootstrap(configuration);
+ } catch (ResourceHandlerException e) {
+ invalidConfiguration = true;
+ }
+ Assert.assertTrue(invalidConfiguration);
+ }
+
+ String[] allowedStrings = {"1,2", "1"};
+ for (String s : allowedStrings) {
+ boolean invalidConfiguration = false;
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, s);
+ try {
+ fpgaResourceHandler.bootstrap(configuration);
+ } catch (ResourceHandlerException e) {
+ invalidConfiguration = true;
+ }
+ Assert.assertFalse(invalidConfiguration);
+ }
+ }
+
+ @Test
+ public void testBootStrapWithEmptyUserConfiguration() throws ResourceHandlerException {
+ // User configuration contains invalid minor device number
+ String allowed = "";
+ boolean invalidConfiguration = false;
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, allowed);
+ try {
+ fpgaResourceHandler.bootstrap(configuration);
+ } catch (ResourceHandlerException e) {
+ invalidConfiguration = true;
+ }
+ Assert.assertTrue(invalidConfiguration);
+ }
+
+ @Test
+ public void testAllocationWithPreference() throws ResourceHandlerException, PrivilegedOperationException {
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
+ fpgaResourceHandler.bootstrap(configuration);
+ // Case 1. The id-0 container request 1 FPGA of IntelOpenCL type and GEMM IP
+ fpgaResourceHandler.preStart(mockContainer(0, 1, "GEMM"));
+ Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ verifyDeniedDevices(getContainerId(0), Arrays.asList(1, 2));
+ List<FpgaResourceAllocator.FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
+ .getUsedFpga().get(getContainerId(0).toString());
+ for (FpgaResourceAllocator.FpgaDevice device : list) {
+ Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
+ }
+ // Case 2. The id-1 container request 3 FPGA of IntelOpenCL and GEMM IP. this should fail
+ boolean flag = false;
+ try {
+ fpgaResourceHandler.preStart(mockContainer(1, 3, "GZIP"));
+ } catch (ResourceHandlerException e) {
+ flag = true;
+ }
+ Assert.assertTrue(flag);
+ // Case 3. Release the id-0 container
+ fpgaResourceHandler.postComplete(getContainerId(0));
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ // Now we have enough devices, re-allocate for the id-1 container
+ fpgaResourceHandler.preStart(mockContainer(1, 3, "GEMM"));
+ // Id-1 container should have 0 denied devices
+ verifyDeniedDevices(getContainerId(1), new ArrayList<>());
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ // Release container id-1
+ fpgaResourceHandler.postComplete(getContainerId(1));
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ // Case 4. Now all 3 devices should have IPID GEMM
+ // Try container id-2 and id-3
+ fpgaResourceHandler.preStart(mockContainer(2, 1, "GZIP"));
+ fpgaResourceHandler.postComplete(getContainerId(2));
+ fpgaResourceHandler.preStart(mockContainer(3, 2, "GEMM"));
+
+ // IPID should be GEMM for id-3 container
+ list = fpgaResourceHandler.getFpgaAllocator()
+ .getUsedFpga().get(getContainerId(3).toString());
+ for (FpgaResourceAllocator.FpgaDevice device : list) {
+ Assert.assertEquals("IPID should be GEMM", "GEMM", device.getIPID());
+ }
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ fpgaResourceHandler.postComplete(getContainerId(3));
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+
+ // Case 5. id-4 request 0 FPGA device
+ fpgaResourceHandler.preStart(mockContainer(4, 0, ""));
+ // Deny all devices for id-4
+ verifyDeniedDevices(getContainerId(4), Arrays.asList(0, 1, 2));
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+
+ // Case 6. id-5 with invalid FPGA device
+ try {
+ fpgaResourceHandler.preStart(mockContainer(5, -2, ""));
+ } catch (ResourceHandlerException e) {
+ Assert.assertTrue(true);
+ }
+ }
+
+ @Test
+ public void testsAllocationWithExistingIPIDDevices() throws ResourceHandlerException, PrivilegedOperationException {
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
+ fpgaResourceHandler.bootstrap(configuration);
+ // The id-0 container request 3 FPGA of IntelOpenCL type and GEMM IP
+ fpgaResourceHandler.preStart(mockContainer(0, 3, "GEMM"));
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ List<FpgaResourceAllocator.FpgaDevice> list = fpgaResourceHandler.getFpgaAllocator()
+ .getUsedFpga().get(getContainerId(0).toString());
+ fpgaResourceHandler.postComplete(getContainerId(0));
+ for (FpgaResourceAllocator.FpgaDevice device : list) {
+ Assert.assertEquals("IP should be updated to GEMM", "GEMM", device.getIPID());
+ }
+
+ // Case 1. id-1 container request preStart, with no plugin.configureIP called
+ fpgaResourceHandler.preStart(mockContainer(1, 1, "GEMM"));
+ fpgaResourceHandler.preStart(mockContainer(2, 1, "GEMM"));
+ // we should have 3 times due to id-1 skip 1 invocation
+ verify(mockVendorPlugin, times(3)).configureIP(anyString(),anyString());
+ fpgaResourceHandler.postComplete(getContainerId(1));
+ fpgaResourceHandler.postComplete(getContainerId(2));
+
+ // Case 2. id-2 container request preStart, with 1 plugin.configureIP called
+ fpgaResourceHandler.preStart(mockContainer(1, 1, "GZIP"));
+ // we should have 4 times invocation
+ verify(mockVendorPlugin, times(4)).configureIP(anyString(),anyString());
+ }
+
+ @Test
+ public void testAllocationWithZeroDevices() throws ResourceHandlerException, PrivilegedOperationException {
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
+ fpgaResourceHandler.bootstrap(configuration);
+ // The id-0 container request 0 FPGA
+ fpgaResourceHandler.preStart(mockContainer(0, 0, null));
+ verifyDeniedDevices(getContainerId(0), Arrays.asList(0, 1, 2));
+ verify(mockVendorPlugin, times(0)).downloadIP(anyString(), anyString(), anyMap());
+ verify(mockVendorPlugin, times(0)).configureIP(anyString(), anyString());
+ }
+
+ @Test
+ public void testStateStore() throws ResourceHandlerException, IOException {
+ // Case 1. store 3 devices
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
+ fpgaResourceHandler.bootstrap(configuration);
+ Container container0 = mockContainer(0, 3, "GEMM");
+ fpgaResourceHandler.preStart(container0);
+ List<FpgaResourceAllocator.FpgaDevice> assigned =
+ fpgaResourceHandler.getFpgaAllocator().getUsedFpga().get(getContainerId(0).toString());
+ verify(mockNMStateStore).storeAssignedResources(container0,
+ ResourceInformation.FPGA_URI,
+ new ArrayList<>(assigned));
+ fpgaResourceHandler.postComplete(getContainerId(0));
+ // Case 2. ask 0, no store api called
+ Container container1 = mockContainer(1, 0, "");
+ fpgaResourceHandler.preStart(container1);
+ verify(mockNMStateStore, never()).storeAssignedResources(
+ eq(container1), eq(ResourceInformation.FPGA_URI), anyList());
+ }
+
+ @Test
+ public void testReacquireContainer() throws ResourceHandlerException {
+
+ Container c0 = mockContainer(0, 2, "GEMM");
+ List<FpgaResourceAllocator.FpgaDevice> assigned = new ArrayList<>();
+ assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 0, null));
+ assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
+ // Mock we've stored the c0 states
+ mockStateStoreForContainer(c0, assigned);
+ // NM start
+ configuration.set(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES, "0,1,2");
+ fpgaResourceHandler.bootstrap(configuration);
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ // Case 1. try recover state for id-0 container
+ fpgaResourceHandler.reacquireContainer(getContainerId(0));
+ // minor number matches
+ List<FpgaResourceAllocator.FpgaDevice> used = fpgaResourceHandler.getFpgaAllocator().
+ getUsedFpga().get(getContainerId(0).toString());
+ int count = 0;
+ for (FpgaResourceAllocator.FpgaDevice device : used) {
+ if (device.getMinor().equals(0)){
+ count++;
+ }
+ if (device.getMinor().equals(1)) {
+ count++;
+ }
+ }
+ Assert.assertEquals("Unexpected used minor number in allocator",2, count);
+ List<FpgaResourceAllocator.FpgaDevice> available = fpgaResourceHandler.getFpgaAllocator().
+ getAvailableFpga().get(vendorType);
+ count = 0;
+ for (FpgaResourceAllocator.FpgaDevice device : available) {
+ if (device.getMinor().equals(2)) {
+ count++;
+ }
+ }
+ Assert.assertEquals("Unexpected available minor number in allocator", 1, count);
+
+
+ // Case 2. Recover a not allowed device with minor number 5
+ Container c1 = mockContainer(1, 1, "GEMM");
+ assigned = new ArrayList<>();
+ assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 5, null));
+ // Mock we've stored the c1 states
+ mockStateStoreForContainer(c1, assigned);
+ boolean flag = false;
+ try {
+ fpgaResourceHandler.reacquireContainer(getContainerId(1));
+ } catch (ResourceHandlerException e) {
+ flag = true;
+ }
+ Assert.assertTrue(flag);
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+
+ // Case 3. recover a already used device by other container
+ Container c2 = mockContainer(2, 1, "GEMM");
+ assigned = new ArrayList<>();
+ assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 1, null));
+ // Mock we've stored the c2 states
+ mockStateStoreForContainer(c2, assigned);
+ flag = false;
+ try {
+ fpgaResourceHandler.reacquireContainer(getContainerId(2));
+ } catch (ResourceHandlerException e) {
+ flag = true;
+ }
+ Assert.assertTrue(flag);
+ Assert.assertEquals(2, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(1, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+
+ // Case 4. recover a normal container c3 with remaining minor device number 2
+ Container c3 = mockContainer(3, 1, "GEMM");
+ assigned = new ArrayList<>();
+ assigned.add(new FpgaResourceAllocator.FpgaDevice(vendorType, 247, 2, null));
+ // Mock we've stored the c2 states
+ mockStateStoreForContainer(c3, assigned);
+ fpgaResourceHandler.reacquireContainer(getContainerId(3));
+ Assert.assertEquals(3, fpgaResourceHandler.getFpgaAllocator().getUsedFpgaCount());
+ Assert.assertEquals(0, fpgaResourceHandler.getFpgaAllocator().getAvailableFpgaCount());
+ }
+
+ private void verifyDeniedDevices(ContainerId containerId,
+ List<Integer> deniedDevices)
+ throws ResourceHandlerException, PrivilegedOperationException {
+ verify(mockCGroupsHandler, atLeastOnce()).createCGroup(
+ CGroupsHandler.CGroupController.DEVICES, containerId.toString());
+
+ if (null != deniedDevices && !deniedDevices.isEmpty()) {
+ verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation(
+ new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA, Arrays
+ .asList(FpgaResourceHandlerImpl.CONTAINER_ID_CLI_OPTION,
+ containerId.toString(),
+ FpgaResourceHandlerImpl.EXCLUDED_FPGAS_CLI_OPTION,
+ StringUtils.join(",", deniedDevices))), true);
+ } else if (deniedDevices.isEmpty()) {
+ verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation(
+ new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA, Arrays
+ .asList(FpgaResourceHandlerImpl.CONTAINER_ID_CLI_OPTION,
+ containerId.toString())), true);
+ }
+ }
+
+ private static IntelFpgaOpenclPlugin mockPlugin(String type, List<FpgaResourceAllocator.FpgaDevice> list) {
+ IntelFpgaOpenclPlugin plugin = mock(IntelFpgaOpenclPlugin.class);
+ when(plugin.initPlugin(Mockito.anyObject())).thenReturn(true);
+ when(plugin.getFpgaType()).thenReturn(type);
+ when(plugin.downloadIP(Mockito.anyString(), Mockito.anyString(), Mockito.anyMap())).thenReturn("/tmp");
+ when(plugin.configureIP(Mockito.anyString(), Mockito.anyObject())).thenReturn(true);
+ when(plugin.discover(Mockito.anyInt())).thenReturn(list);
+ return plugin;
+ }
+
+
+ private static Container mockContainer(int id, int numFpga, String IPID) {
+ Container c = mock(Container.class);
+
+ Resource res = Resource.newInstance(1024, 1);
+ ResourceMappings resMapping = new ResourceMappings();
+ res.setResourceValue(ResourceInformation.FPGA_URI, numFpga);
+ when(c.getResource()).thenReturn(res);
+ when(c.getResourceMappings()).thenReturn(resMapping);
+
+ when(c.getContainerId()).thenReturn(getContainerId(id));
+
+ ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
+ Map<String, String> envs = new HashMap<>();
+ if (numFpga > 0) {
+ envs.put("REQUESTED_FPGA_IP_ID", IPID);
+ }
+ when(c.getLaunchContext()).thenReturn(clc);
+ when(clc.getEnvironment()).thenReturn(envs);
+ when(c.getWorkDir()).thenReturn("/tmp");
+ ResourceSet resourceSet = new ResourceSet();
+ when(c.getResourceSet()).thenReturn(resourceSet);
+
+ return c;
+ }
+
+ private void mockStateStoreForContainer(Container container,
+ List<FpgaResourceAllocator.FpgaDevice> assigned) {
+ ResourceMappings rmap = new ResourceMappings();
+ ResourceMappings.AssignedResources ar =
+ new ResourceMappings.AssignedResources();
+ ar.updateAssignedResources(new ArrayList<>(assigned));
+ rmap.addAssignedResources(ResourceInformation.FPGA_URI, ar);
+ when(container.getResourceMappings()).thenReturn(rmap);
+ runningContainersMap.put(container.getContainerId(), container);
+ }
+
+ private static ContainerId getContainerId(int id) {
+ return ContainerId.newContainerId(ApplicationAttemptId
+ .newInstance(ApplicationId.newInstance(1234L, 1), 1), id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/TestFpgaDiscoverer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/TestFpgaDiscoverer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/TestFpgaDiscoverer.java
new file mode 100644
index 0000000..87fb4e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/TestFpgaDiscoverer.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestFpgaDiscoverer {
+
+ private String getTestParentFolder() {
+ File f = new File("target/temp/" + TestFpgaDiscoverer.class.getName());
+ return f.getAbsolutePath();
+ }
+
+ private void touchFile(File f) throws IOException {
+ new FileOutputStream(f).close();
+ }
+
+ @Before
+ public void before() throws IOException {
+ String folder = getTestParentFolder();
+ File f = new File(folder);
+ FileUtils.deleteDirectory(f);
+ f.mkdirs();
+ }
+
+ @Test
+ public void testLinuxFpgaResourceDiscoverPluginConfig() throws YarnException, IOException {
+ Configuration conf = new Configuration(false);
+ FpgaDiscoverer discoverer = FpgaDiscoverer.getInstance();
+
+ IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+ // because FPGA discoverer is a singleton, we use setPlugin to make
+ // FpgaDiscoverer.getInstance().diagnose() work in openclPlugin.initPlugin()
+ discoverer.setResourceHanderPlugin(openclPlugin);
+ openclPlugin.initPlugin(conf);
+ openclPlugin.setShell(mockPuginShell());
+
+ discoverer.initialize(conf);
+ // Case 1. No configuration set for binary
+ Assert.assertEquals("No configuration should return just a single binary name",
+ "aocl", openclPlugin.getPathToExecutable());
+
+ // Case 2. With correct configuration and file exists
+ File fakeBinary = new File(getTestParentFolder() + "/aocl");
+ conf.set(YarnConfiguration.NM_FPGA_PATH_TO_EXEC, getTestParentFolder() + "/aocl");
+ touchFile(fakeBinary);
+ discoverer.initialize(conf);
+ Assert.assertEquals("Correct configuration should return user setting",
+ getTestParentFolder() + "/aocl", openclPlugin.getPathToExecutable());
+
+ // Case 3. With correct configuration but file doesn't exists. Use default
+ fakeBinary.delete();
+ discoverer.initialize(conf);
+ Assert.assertEquals("Correct configuration but file doesn't exists should return just a single binary name",
+ "aocl", openclPlugin.getPathToExecutable());
+
+ }
+
+ @Test
+ public void testDiscoverPluginParser() throws YarnException {
+ String output = "------------------------- acl0 -------------------------\n" +
+ "Vendor: Nallatech ltd\n" +
+ "Phys Dev Name Status Information\n" +
+ "aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)\n" +
+ " PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8\n" +
+ " FPGA temperature = 53.1 degrees C.\n" +
+ " Total Card Power Usage = 31.7 Watts.\n" +
+ " Device Power Usage = 0.0 Watts.\n" +
+ "DIAGNOSTIC_PASSED" +
+ "---------------------------------------------------------\n";
+ output = output +
+ "------------------------- acl1 -------------------------\n" +
+ "Vendor: Nallatech ltd\n" +
+ "Phys Dev Name Status Information\n" +
+ "aclnalla_pcie1Passed nalla_pcie (aclnalla_pcie1)\n" +
+ " PCIe dev_id = 2495, bus:slot.func = 03:00.00, Gen3 x8\n" +
+ " FPGA temperature = 43.1 degrees C.\n" +
+ " Total Card Power Usage = 11.7 Watts.\n" +
+ " Device Power Usage = 0.0 Watts.\n" +
+ "DIAGNOSTIC_PASSED" +
+ "---------------------------------------------------------\n";
+ output = output +
+ "------------------------- acl2 -------------------------\n" +
+ "Vendor: Intel(R) Corporation\n" +
+ "\n" +
+ "Phys Dev Name Status Information\n" +
+ "\n" +
+ "acla10_ref0 Passed Arria 10 Reference Platform (acla10_ref0)\n" +
+ " PCIe dev_id = 2494, bus:slot.func = 09:00.00, Gen2 x8\n" +
+ " FPGA temperature = 50.5781 degrees C.\n" +
+ "\n" +
+ "DIAGNOSTIC_PASSED\n" +
+ "---------------------------------------------------------\n";
+ Configuration conf = new Configuration(false);
+ IntelFpgaOpenclPlugin openclPlugin = new IntelFpgaOpenclPlugin();
+ FpgaDiscoverer.getInstance().setResourceHanderPlugin(openclPlugin);
+
+ openclPlugin.initPlugin(conf);
+ openclPlugin.setShell(mockPuginShell());
+
+ FpgaDiscoverer.getInstance().initialize(conf);
+
+ List<FpgaResourceAllocator.FpgaDevice> list = new LinkedList<>();
+
+ // Case 1. core parsing
+ openclPlugin.parseDiagnoseInfo(output, list);
+ Assert.assertEquals(3, list.size());
+ Assert.assertEquals("IntelOpenCL", list.get(0).getType());
+ Assert.assertEquals("247", list.get(0).getMajor().toString());
+ Assert.assertEquals("0", list.get(0).getMinor().toString());
+ Assert.assertEquals("acl0", list.get(0).getAliasDevName());
+ Assert.assertEquals("aclnalla_pcie0", list.get(0).getDevName());
+ Assert.assertEquals("02:00.00", list.get(0).getBusNum());
+ Assert.assertEquals("53.1 degrees C", list.get(0).getTemperature());
+ Assert.assertEquals("31.7 Watts", list.get(0).getCardPowerUsage());
+
+ Assert.assertEquals("IntelOpenCL", list.get(1).getType());
+ Assert.assertEquals("247", list.get(1).getMajor().toString());
+ Assert.assertEquals("1", list.get(1).getMinor().toString());
+ Assert.assertEquals("acl1", list.get(1).getAliasDevName());
+ Assert.assertEquals("aclnalla_pcie1", list.get(1).getDevName());
+ Assert.assertEquals("03:00.00", list.get(1).getBusNum());
+ Assert.assertEquals("43.1 degrees C", list.get(1).getTemperature());
+ Assert.assertEquals("11.7 Watts", list.get(1).getCardPowerUsage());
+
+ Assert.assertEquals("IntelOpenCL", list.get(2).getType());
+ Assert.assertEquals("246", list.get(2).getMajor().toString());
+ Assert.assertEquals("0", list.get(2).getMinor().toString());
+ Assert.assertEquals("acl2", list.get(2).getAliasDevName());
+ Assert.assertEquals("acla10_ref0", list.get(2).getDevName());
+ Assert.assertEquals("09:00.00", list.get(2).getBusNum());
+ Assert.assertEquals("50.5781 degrees C", list.get(2).getTemperature());
+ Assert.assertEquals("", list.get(2).getCardPowerUsage());
+
+ // Case 2. check alias map
+ Map<String, String> aliasMap = openclPlugin.getAliasMap();
+ Assert.assertEquals("acl0", aliasMap.get("247:0"));
+ Assert.assertEquals("acl1", aliasMap.get("247:1"));
+ Assert.assertEquals("acl2", aliasMap.get("246:0"));
+ }
+
+ private IntelFpgaOpenclPlugin.InnerShellExecutor mockPuginShell() {
+ IntelFpgaOpenclPlugin.InnerShellExecutor shell = mock(IntelFpgaOpenclPlugin.InnerShellExecutor.class);
+ when(shell.runDiagnose(anyString(),anyInt())).thenReturn("");
+ when(shell.getMajorAndMinorNumber("aclnalla_pcie0")).thenReturn("247:0");
+ when(shell.getMajorAndMinorNumber("aclnalla_pcie1")).thenReturn("247:1");
+ when(shell.getMajorAndMinorNumber("acla10_ref0")).thenReturn("246:0");
+ return shell;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-6507. Add support in NodeManager to
isolate FPGA devices with CGroups. (Zhankun Tang via wangda)
Posted by wa...@apache.org.
YARN-6507. Add support in NodeManager to isolate FPGA devices with CGroups. (Zhankun Tang via wangda)
Change-Id: Ic9afd841805f1035423915a0b0add5f3ba96cf9d
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7225ec0c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7225ec0c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7225ec0c
Branch: refs/heads/trunk
Commit: 7225ec0ceb49ae8f5588484297a20f07ec047420
Parents: 5304698
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Dec 1 10:50:49 2017 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Dec 1 10:50:49 2017 -0800
----------------------------------------------------------------------
.../yarn/api/records/ResourceInformation.java | 5 +-
.../hadoop/yarn/conf/YarnConfiguration.java | 25 +-
.../src/main/resources/yarn-default.xml | 42 +-
.../linux/privileged/PrivilegedOperation.java | 1 +
.../resources/fpga/FpgaResourceAllocator.java | 413 +++++++++++++++++
.../resources/fpga/FpgaResourceHandlerImpl.java | 220 +++++++++
.../resourceplugin/ResourcePluginManager.java | 8 +-
.../fpga/AbstractFpgaVendorPlugin.java | 90 ++++
.../resourceplugin/fpga/FpgaDiscoverer.java | 139 ++++++
.../fpga/FpgaNodeResourceUpdateHandler.java | 71 +++
.../resourceplugin/fpga/FpgaResourcePlugin.java | 105 +++++
.../fpga/IntelFpgaOpenclPlugin.java | 396 ++++++++++++++++
.../resources/fpga/TestFpgaResourceHandler.java | 458 +++++++++++++++++++
.../resourceplugin/fpga/TestFpgaDiscoverer.java | 187 ++++++++
14 files changed, 2155 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
index 67592cc..a8198d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -42,6 +42,7 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
public static final String MEMORY_URI = "memory-mb";
public static final String VCORES_URI = "vcores";
public static final String GPU_URI = "yarn.io/gpu";
+ public static final String FPGA_URI = "yarn.io/fpga";
public static final ResourceInformation MEMORY_MB =
ResourceInformation.newInstance(MEMORY_URI, "Mi");
@@ -49,9 +50,11 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
ResourceInformation.newInstance(VCORES_URI);
public static final ResourceInformation GPUS =
ResourceInformation.newInstance(GPU_URI);
+ public static final ResourceInformation FPGAS =
+ ResourceInformation.newInstance(FPGA_URI);
public static final Map<String, ResourceInformation> MANDATORY_RESOURCES =
- ImmutableMap.of(MEMORY_URI, MEMORY_MB, VCORES_URI, VCORES, GPU_URI, GPUS);
+ ImmutableMap.of(MEMORY_URI, MEMORY_MB, VCORES_URI, VCORES, GPU_URI, GPUS, FPGA_URI, FPGAS);
/**
* Get the name for the resource.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c1024ea..831abf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1514,13 +1514,36 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT =
"http://localhost:3476/v1.0/docker/cli";
+ /**
+ * Prefix for FPGA configurations. Work in progress: This configuration
+ * parameter may be changed/removed in the future.
+ */
+ @Private
+ public static final String NM_FPGA_RESOURCE_PREFIX =
+ NM_RESOURCE_PLUGINS + ".fpga.";
+
+ @Private
+ public static final String NM_FPGA_ALLOWED_DEVICES =
+ NM_FPGA_RESOURCE_PREFIX + "allowed-fpga-devices";
+
+ @Private
+ public static final String NM_FPGA_PATH_TO_EXEC =
+ NM_FPGA_RESOURCE_PREFIX + "path-to-discovery-executables";
+
+ @Private
+ public static final String NM_FPGA_VENDOR_PLUGIN =
+ NM_FPGA_RESOURCE_PREFIX + "vendor-plugin.class";
+
+ @Private
+ public static final String DEFAULT_NM_FPGA_VENDOR_PLUGIN =
+ "org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin";
/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042;
public static final String DEFAULT_NM_WEBAPP_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_WEBAPP_PORT;
-
+
/** NM Webapp https address.**/
public static final String NM_WEBAPP_HTTPS_ADDRESS = NM_PREFIX
+ "webapp.https.address";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dd9c6bd..2550c42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3512,7 +3512,8 @@
<property>
<description>
Enable additional discovery/isolation of resources on the NodeManager,
- split by comma. By default, this is empty. Acceptable values: { "yarn-io/gpu" }.
+ split by comma. By default, this is empty.
+ Acceptable values: { "yarn-io/gpu", "yarn-io/fpga"}.
</description>
<name>yarn.nodemanager.resource-plugins</name>
<value></value>
@@ -3559,6 +3560,43 @@
<value>http://localhost:3476/v1.0/docker/cli</value>
</property>
->>>>>>> theirs
+ <property>
+ <description>
+ Specify one vendor plugin to handle FPGA devices discovery/IP download/configure.
+ Only IntelFpgaOpenclPlugin is supported by default.
+ We only allow one NM configured with one vendor FPGA plugin now since the end user can put the same
+ vendor's cards in one host. And this also simplify our design.
+ </description>
+ <name>yarn.nodemanager.resource-plugins.fpga.vendor-plugin.class</name>
+ <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin</value>
+ </property>
+
+ <property>
+ <description>
+ When yarn.nodemanager.resource.fpga.allowed-fpga-devices=auto specified,
+ YARN NodeManager needs to run FPGA discovery binary (now only support
+ IntelFpgaOpenclPlugin) to get FPGA information.
+ When value is empty (default), YARN NodeManager will try to locate
+ discovery executable from vendor plugin's preference
+ </description>
+ <name>yarn.nodemanager.resource-plugins.fpga.path-to-discovery-executables</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>
+ Specify FPGA devices which can be managed by YARN NodeManager, split by comma
+ Number of FPGA devices will be reported to RM to make scheduling decisions.
+ Set to auto (default) let YARN automatically discover FPGA resource from
+ system.
+
+ Manually specify FPGA devices if admin only want subset of FPGA devices managed by YARN.
+ At present, since we can only configure one major number in c-e.cfg, FPGA device is
+ identified by their minor device number. A common approach to get minor
+ device number of FPGA is using "aocl diagnose" and check uevent with device name.
+ </description>
+ <name>yarn.nodemanager.resource-plugins.fpga.allowed-fpga-devices</name>
+ <value>0,1</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
index db0b225..ad8c22f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
@@ -52,6 +52,7 @@ public class PrivilegedOperation {
ADD_PID_TO_CGROUP(""), //no CLI switch supported yet.
RUN_DOCKER_CMD("--run-docker"),
GPU("--module-gpu"),
+ FPGA("--module-fpga"),
LIST_AS_USER(""); //no CLI switch supported yet.
private final String option;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
new file mode 100644
index 0000000..62dd3c4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
+
+
+/**
+ * This FPGA resource allocator tends to be used by different FPGA vendor's plugin
+ * A "type" parameter is taken into consideration when allocation
+ * */
+public class FpgaResourceAllocator {
+
+ static final Log LOG = LogFactory.getLog(FpgaResourceAllocator.class);
+
+ private List<FpgaDevice> allowedFpgas = new LinkedList<>();
+
+ //key is resource type of FPGA, vendor plugin supported ID
+ private LinkedHashMap<String, List<FpgaDevice>> availableFpga = new LinkedHashMap<>();
+
+ //key is requetor, aka. container ID
+ private LinkedHashMap<String, List<FpgaDevice>> usedFpgaByRequestor = new LinkedHashMap<>();
+
+ private Context nmContext;
+
+ @VisibleForTesting
+ public HashMap<String, List<FpgaDevice>> getAvailableFpga() {
+ return availableFpga;
+ }
+
+ @VisibleForTesting
+ public List<FpgaDevice> getAllowedFpga() {
+ return allowedFpgas;
+ }
+
+ public FpgaResourceAllocator(Context ctx) {
+ this.nmContext = ctx;
+ }
+
+ @VisibleForTesting
+ public int getAvailableFpgaCount() {
+ int count = 0;
+ for (List<FpgaDevice> l : availableFpga.values()) {
+ count += l.size();
+ }
+ return count;
+ }
+
+ @VisibleForTesting
+ public HashMap<String, List<FpgaDevice>> getUsedFpga() {
+ return usedFpgaByRequestor;
+ }
+
+ @VisibleForTesting
+ public int getUsedFpgaCount() {
+ int count = 0;
+ for (List<FpgaDevice> l : usedFpgaByRequestor.values()) {
+ count += l.size();
+ }
+ return count;
+ }
+
+ public static class FpgaAllocation {
+
+ private List<FpgaDevice> allowed = Collections.emptyList();
+
+ private List<FpgaDevice> denied = Collections.emptyList();
+
+ FpgaAllocation(List<FpgaDevice> allowed, List<FpgaDevice> denied) {
+ if (allowed != null) {
+ this.allowed = ImmutableList.copyOf(allowed);
+ }
+ if (denied != null) {
+ this.denied = ImmutableList.copyOf(denied);
+ }
+ }
+
+ public List<FpgaDevice> getAllowed() {
+ return allowed;
+ }
+
+ public List<FpgaDevice> getDenied() {
+ return denied;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nFpgaAllocation\n\tAllowed:\n");
+ for (FpgaDevice device : allowed) {
+ sb.append("\t\t");
+ sb.append(device + "\n");
+ }
+ sb.append("\tDenied\n");
+ for (FpgaDevice device : denied) {
+ sb.append("\t\t");
+ sb.append(device + "\n");
+ }
+ return sb.toString();
+ }
+ }
+
+ public static class FpgaDevice implements Comparable<FpgaDevice>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String type;
+ private Integer major;
+ private Integer minor;
+ // IP file identifier. matrix multiplication for instance
+ private String IPID;
+ // the device name under /dev
+ private String devName;
+ // the alias device name. Intel use acl number acl0 to acl31
+ private String aliasDevName;
+ // lspci output's bus number: 02:00.00 (bus:slot.func)
+ private String busNum;
+ private String temperature;
+ private String cardPowerUsage;
+
+ public String getType() {
+ return type;
+ }
+
+ public Integer getMajor() {
+ return major;
+ }
+
+ public Integer getMinor() {
+ return minor;
+ }
+
+ public String getIPID() {
+ return IPID;
+ }
+
+ public void setIPID(String IPID) {
+ this.IPID = IPID;
+ }
+
+ public String getDevName() {
+ return devName;
+ }
+
+ public void setDevName(String devName) {
+ this.devName = devName;
+ }
+
+ public String getAliasDevName() {
+ return aliasDevName;
+ }
+
+ public void setAliasDevName(String aliasDevName) {
+ this.aliasDevName = aliasDevName;
+ }
+
+ public String getBusNum() {
+ return busNum;
+ }
+
+ public void setBusNum(String busNum) {
+ this.busNum = busNum;
+ }
+
+ public String getTemperature() {
+ return temperature;
+ }
+
+ public String getCardPowerUsage() {
+ return cardPowerUsage;
+ }
+
+ public FpgaDevice(String type, Integer major, Integer minor, String IPID) {
+ this.type = type;
+ this.major = major;
+ this.minor = minor;
+ this.IPID = IPID;
+ }
+
+ public FpgaDevice(String type, Integer major,
+ Integer minor, String IPID, String devName,
+ String aliasDevName, String busNum, String temperature, String cardPowerUsage) {
+ this.type = type;
+ this.major = major;
+ this.minor = minor;
+ this.IPID = IPID;
+ this.devName = devName;
+ this.aliasDevName = aliasDevName;
+ this.busNum = busNum;
+ this.temperature = temperature;
+ this.cardPowerUsage = cardPowerUsage;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof FpgaDevice)) {
+ return false;
+ }
+ FpgaDevice other = (FpgaDevice) obj;
+ if (other.getType().equals(this.type) &&
+ other.getMajor().equals(this.major) &&
+ other.getMinor().equals(this.minor)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
+ result = prime * result + ((major == null) ? 0 : major.hashCode());
+ result = prime * result + ((minor == null) ? 0 : minor.hashCode());
+ return result;
+ }
+
+ @Override
+ public int compareTo(FpgaDevice o) {
+ return 0;
+ }
+
+ @Override
+ public String toString() {
+ return "FPGA Device:(Type: " + this.type + ", Major: " +
+ this.major + ", Minor: " + this.minor + ", IPID: " + this.IPID + ")";
+ }
+ }
+
+ public synchronized void addFpga(String type, List<FpgaDevice> list) {
+ availableFpga.putIfAbsent(type, new LinkedList<>());
+ for (FpgaDevice device : list) {
+ if (!allowedFpgas.contains(device)) {
+ allowedFpgas.add(device);
+ availableFpga.get(type).add(device);
+ }
+ }
+ LOG.info("Add a list of FPGA Devices: " + list);
+ }
+
+ public synchronized void updateFpga(String requestor,
+ FpgaDevice device, String newIPID) {
+ List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
+ int index = findMatchedFpga(usedFpgas, device);
+ if (-1 != index) {
+ usedFpgas.get(index).setIPID(newIPID);
+ } else {
+ LOG.warn("Failed to update FPGA due to unknown reason " +
+ "that no record for this allocated device:" + device);
+ }
+ LOG.info("Update IPID to " + newIPID +
+ " for this allocated device:" + device);
+ }
+
+ private synchronized int findMatchedFpga(List<FpgaDevice> devices, FpgaDevice item) {
+ int i = 0;
+ for (; i < devices.size(); i++) {
+ if (devices.get(i) == item) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Assign {@link FpgaAllocation} with preferred IPID, if no, with random FPGAs
+ * @param type vendor plugin supported FPGA device type
+ * @param count requested FPGA slot count
+ * @param container container id
+ * @param IPIDPreference allocate slot with this IPID first
+ * @return Instance consists two List of allowed and denied {@link FpgaDevice}
+ * @throws ResourceHandlerException When failed to allocate or write state store
+ * */
+ public synchronized FpgaAllocation assignFpga(String type, long count,
+ Container container, String IPIDPreference) throws ResourceHandlerException {
+ List<FpgaDevice> currentAvailableFpga = availableFpga.get(type);
+ String requestor = container.getContainerId().toString();
+ if (null == currentAvailableFpga) {
+ throw new ResourceHandlerException("No such type of FPGA resource available: " + type);
+ }
+ if (count < 0 || count > currentAvailableFpga.size()) {
+ throw new ResourceHandlerException("Invalid FPGA request count or not enough, requested:" +
+ count + ", available:" + getAvailableFpgaCount());
+ }
+ if (count > 0) {
+ // Allocate devices with matching IP first, then any device is ok
+ List<FpgaDevice> assignedFpgas = new LinkedList<>();
+ int matchIPCount = 0;
+ for (int i = 0; i < currentAvailableFpga.size(); i++) {
+ if ( null != currentAvailableFpga.get(i).getIPID() &&
+ currentAvailableFpga.get(i).getIPID().equalsIgnoreCase(IPIDPreference)) {
+ assignedFpgas.add(currentAvailableFpga.get(i));
+ currentAvailableFpga.remove(i);
+ matchIPCount++;
+ }
+ }
+ int remaining = (int) count - matchIPCount;
+ while (remaining > 0) {
+ assignedFpgas.add(currentAvailableFpga.remove(0));
+ remaining--;
+ }
+
+ // Record in state store if we allocated anything
+ if (!assignedFpgas.isEmpty()) {
+ try {
+ nmContext.getNMStateStore().storeAssignedResources(container,
+ FPGA_URI, new LinkedList<>(assignedFpgas));
+ } catch (IOException e) {
+ // failed, give the allocation back
+ currentAvailableFpga.addAll(assignedFpgas);
+ throw new ResourceHandlerException(e);
+ }
+
+ // update state store success, update internal used FPGAs
+ usedFpgaByRequestor.putIfAbsent(requestor, new LinkedList<>());
+ usedFpgaByRequestor.get(requestor).addAll(assignedFpgas);
+ }
+
+ return new FpgaAllocation(assignedFpgas, currentAvailableFpga);
+ }
+ return new FpgaAllocation(null, allowedFpgas);
+ }
+
+ public synchronized void recoverAssignedFpgas(ContainerId containerId) throws ResourceHandlerException {
+ Container c = nmContext.getContainers().get(containerId);
+ if (null == c) {
+ throw new ResourceHandlerException(
+ "This shouldn't happen, cannot find container with id="
+ + containerId);
+ }
+
+ for (Serializable fpgaDevice :
+ c.getResourceMappings().getAssignedResources(FPGA_URI)) {
+ if (!(fpgaDevice instanceof FpgaDevice)) {
+ throw new ResourceHandlerException(
+ "Trying to recover allocated FPGA devices, however it"
+ + " is not FpgaDevice type, this shouldn't happen");
+ }
+
+ // Make sure it is in allowed FPGA device.
+ if (!allowedFpgas.contains(fpgaDevice)) {
+ throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice
+ + " however it is not in allowed device list:" + StringUtils
+ .join(";", allowedFpgas));
+ }
+
+ // Make sure it is not occupied by anybody else
+ Iterator<Map.Entry<String, List<FpgaDevice>>> iterator =
+ getUsedFpga().entrySet().iterator();
+ while (iterator.hasNext()) {
+ if (iterator.next().getValue().contains(fpgaDevice)) {
+ throw new ResourceHandlerException("Try to recover FpgaDevice = " + fpgaDevice
+ + " however it is already assigned to others");
+ }
+ }
+ getUsedFpga().putIfAbsent(containerId.toString(), new LinkedList<>());
+ getUsedFpga().get(containerId.toString()).add((FpgaDevice) fpgaDevice);
+ // remove them from available list
+ getAvailableFpga().get(((FpgaDevice) fpgaDevice).getType()).remove(fpgaDevice);
+ }
+ }
+
+ public synchronized void cleanupAssignFpgas(String requestor) {
+ List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
+ if (usedFpgas != null) {
+ for (FpgaDevice device : usedFpgas) {
+ // Add back to availableFpga
+ availableFpga.get(device.getType()).add(device);
+ }
+ usedFpgaByRequestor.remove(requestor);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
new file mode 100644
index 0000000..bf3d9b0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.AbstractFpgaVendorPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
+
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public class FpgaResourceHandlerImpl implements ResourceHandler {
+
+ static final Log LOG = LogFactory.getLog(FpgaResourceHandlerImpl.class);
+
+ private final String REQUEST_FPGA_IP_ID_KEY = "REQUESTED_FPGA_IP_ID";
+
+ private AbstractFpgaVendorPlugin vendorPlugin;
+
+ private FpgaResourceAllocator allocator;
+
+ private CGroupsHandler cGroupsHandler;
+
+ public static final String EXCLUDED_FPGAS_CLI_OPTION = "--excluded_fpgas";
+ public static final String CONTAINER_ID_CLI_OPTION = "--container_id";
+ private PrivilegedOperationExecutor privilegedOperationExecutor;
+
+ @VisibleForTesting
+ public FpgaResourceHandlerImpl(Context nmContext,
+ CGroupsHandler cGroupsHandler,
+ PrivilegedOperationExecutor privilegedOperationExecutor,
+ AbstractFpgaVendorPlugin plugin) {
+ this.allocator = new FpgaResourceAllocator(nmContext);
+ this.vendorPlugin = plugin;
+ FpgaDiscoverer.getInstance().setResourceHanderPlugin(vendorPlugin);
+ this.cGroupsHandler = cGroupsHandler;
+ this.privilegedOperationExecutor = privilegedOperationExecutor;
+ }
+
+ @VisibleForTesting
+ public FpgaResourceAllocator getFpgaAllocator() {
+ return allocator;
+ }
+
+ public String getRequestedIPID(Container container) {
+ String r= container.getLaunchContext().getEnvironment().
+ get(REQUEST_FPGA_IP_ID_KEY);
+ return r == null ? "" : r;
+ }
+
+ @Override
+ public List<PrivilegedOperation> bootstrap(Configuration configuration) throws ResourceHandlerException {
+ // The plugin should be initilized by FpgaDiscoverer already
+ if (!vendorPlugin.initPlugin(configuration)) {
+ throw new ResourceHandlerException("FPGA plugin initialization failed", null);
+ }
+ LOG.info("FPGA Plugin bootstrap success.");
+ // Get avialable devices minor numbers from toolchain or static configuration
+ List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList = FpgaDiscoverer.getInstance().discover();
+ allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList);
+ this.cGroupsHandler.initializeCGroupController(CGroupsHandler.CGroupController.DEVICES);
+ return null;
+ }
+
+ @Override
+ public List<PrivilegedOperation> preStart(Container container) throws ResourceHandlerException {
+ // 1. Get requested FPGA type and count, choose corresponding FPGA plugin(s)
+ // 2. Use allocator.assignFpga(type, count) to get FPGAAllocation
+ // 3. If required, download to ensure IP file exists and configure IP file for all devices
+ List<PrivilegedOperation> ret = new ArrayList<>();
+ String containerIdStr = container.getContainerId().toString();
+ Resource requestedResource = container.getResource();
+
+ // Create device cgroups for the container
+ cGroupsHandler.createCGroup(CGroupsHandler.CGroupController.DEVICES,
+ containerIdStr);
+
+ long deviceCount = requestedResource.getResourceValue(FPGA_URI);
+ LOG.info(containerIdStr + " requested " + deviceCount + " Intel FPGA(s)");
+ String ipFilePath = null;
+ try {
+
+ // allocate even request 0 FPGA because we need to deny all device numbers for this container
+ FpgaResourceAllocator.FpgaAllocation allocation = allocator.assignFpga(
+ vendorPlugin.getFpgaType(), deviceCount,
+ container, getRequestedIPID(container));
+ LOG.info("FpgaAllocation:" + allocation);
+
+ PrivilegedOperation privilegedOperation = new PrivilegedOperation(PrivilegedOperation.OperationType.FPGA,
+ Arrays.asList(CONTAINER_ID_CLI_OPTION, containerIdStr));
+ if (!allocation.getDenied().isEmpty()) {
+ List<Integer> denied = new ArrayList<>();
+ allocation.getDenied().forEach(device -> denied.add(device.getMinor()));
+ privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_FPGAS_CLI_OPTION,
+ StringUtils.join(",", denied)));
+ }
+ privilegedOperationExecutor.executePrivilegedOperation(privilegedOperation, true);
+
+ if (deviceCount > 0) {
+ /**
+ * We only support flashing one IP for all devices now. If user don't set this
+ * environment variable, we assume that user's application can find the IP file by
+ * itself.
+ * Note that the IP downloading and reprogramming in advance in YARN is not necessary because
+ * the OpenCL application may find the IP file and reprogram device on the fly. But YARN do this
+ * for the containers will achieve the quickest reprogram path
+ *
+ * For instance, REQUESTED_FPGA_IP_ID = "matrix_mul" will make all devices
+ * programmed with matrix multiplication IP
+ *
+ * In the future, we may support "matrix_mul:1,gzip:2" format to support different IP
+ * for different devices
+ *
+ * */
+ ipFilePath = vendorPlugin.downloadIP(getRequestedIPID(container), container.getWorkDir(),
+ container.getResourceSet().getLocalizedResources());
+ if (ipFilePath.isEmpty()) {
+ LOG.warn("FPGA plugin failed to download IP but continue, please check the value of environment viable: " +
+ REQUEST_FPGA_IP_ID_KEY + " if you want yarn to help");
+ } else {
+ LOG.info("IP file path:" + ipFilePath);
+ List<FpgaResourceAllocator.FpgaDevice> allowed = allocation.getAllowed();
+ String majorMinorNumber;
+ for (int i = 0; i < allowed.size(); i++) {
+ majorMinorNumber = allowed.get(i).getMajor() + ":" + allowed.get(i).getMinor();
+ String currentIPID = allowed.get(i).getIPID();
+ if (null != currentIPID &&
+ currentIPID.equalsIgnoreCase(getRequestedIPID(container))) {
+ LOG.info("IP already in device \"" + allowed.get(i).getAliasDevName() + "," +
+ majorMinorNumber + "\", skip reprogramming");
+ continue;
+ }
+ if (vendorPlugin.configureIP(ipFilePath, majorMinorNumber)) {
+ // update the allocator that we update an IP of a device
+ allocator.updateFpga(containerIdStr, allowed.get(i),
+ getRequestedIPID(container));
+ //TODO: update the node constraint label
+ }
+ }
+ }
+ }
+ } catch (ResourceHandlerException re) {
+ allocator.cleanupAssignFpgas(containerIdStr);
+ cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
+ containerIdStr);
+ throw re;
+ } catch (PrivilegedOperationException e) {
+ allocator.cleanupAssignFpgas(containerIdStr);
+ cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES, containerIdStr);
+ LOG.warn("Could not update cgroup for container", e);
+ throw new ResourceHandlerException(e);
+ }
+ //isolation operation
+ ret.add(new PrivilegedOperation(
+ PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+ PrivilegedOperation.CGROUP_ARG_PREFIX
+ + cGroupsHandler.getPathForCGroupTasks(
+ CGroupsHandler.CGroupController.DEVICES, containerIdStr)));
+ return ret;
+ }
+
+ @Override
+ public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) throws ResourceHandlerException {
+ allocator.recoverAssignedFpgas(containerId);
+ return null;
+ }
+
+ @Override
+ public List<PrivilegedOperation> postComplete(ContainerId containerId) throws ResourceHandlerException {
+ allocator.cleanupAssignFpgas(containerId.toString());
+ cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
+ containerId.toString());
+ return null;
+ }
+
+ @Override
+ public List<PrivilegedOperation> teardown() throws ResourceHandlerException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
index 73d6038..12d679b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
/**
@@ -42,7 +44,7 @@ public class ResourcePluginManager {
private static final Logger LOG =
LoggerFactory.getLogger(ResourcePluginManager.class);
private static final Set<String> SUPPORTED_RESOURCE_PLUGINS = ImmutableSet.of(
- GPU_URI);
+ GPU_URI, FPGA_URI);
private Map<String, ResourcePlugin> configuredPlugins = Collections.EMPTY_MAP;
@@ -77,6 +79,10 @@ public class ResourcePluginManager {
plugin = new GpuResourcePlugin();
}
+ if (resourceName.equals(FPGA_URI)) {
+ plugin = new FpgaResourcePlugin();
+ }
+
if (plugin == null) {
throw new YarnException(
"This shouldn't happen, plugin=" + resourceName
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java
new file mode 100644
index 0000000..60ea57c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/AbstractFpgaVendorPlugin.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * FPGA plugin interface for vendor to implement. Used by {@link FpgaDiscoverer} and
+ * {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceHandlerImpl}
+ * to discover devices/download IP/configure IP
+ * */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface AbstractFpgaVendorPlugin extends Configurable{
+
+ /**
+ * Check vendor's toolchain and required environment
+ * */
+ boolean initPlugin(Configuration conf);
+
+ /**
+ * Diagnose the devices using vendor toolchain but no need to parse device information
+ * */
+ boolean diagnose(int timeout);
+
+ /**
+ * Discover the vendor's FPGA devices with execution time constraint
+ * @param timeout The vendor plugin should return result during this time
+ * @return The result will be added to FPGAResourceAllocator for later scheduling
+ * */
+ List<FpgaResourceAllocator.FpgaDevice> discover(int timeout);
+
+ /**
+ * Since all vendor plugins share a {@link org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator}
+ * which distinguish FPGA devices by type. Vendor plugin must report this.
+ * */
+ String getFpgaType();
+
+ /**
+ * The vendor plugin download required IP files to a required directory.
+ * It should check if the IP file has already been downloaded.
+ * @param id The identifier for IP file. Comes from application, ie. matrix_multi_v1
+ * @param dstDir The plugin should download IP file to this directory
+ * @param localizedResources The container localized resource can be searched for IP file. Key is
+ * localized file path and value is soft link names
+ * @return The absolute path string of IP file
+ * */
+ String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources);
+
+ /**
+ * The vendor plugin configure an IP file to a device
+ * @param ipPath The absolute path of the IP file
+ * @param majorMinorNumber The device in format <major:minor>
+ * @return configure device ok or not
+ * */
+ boolean configureIP(String ipPath, String majorMinorNumber);
+
+ @Override
+ void setConf(Configuration conf);
+
+ @Override
+ Configuration getConf();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java
new file mode 100644
index 0000000..8d32a18
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaDiscoverer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class FpgaDiscoverer {
+
+ public static final Logger LOG = LoggerFactory.getLogger(
+ FpgaDiscoverer.class);
+
+ private static FpgaDiscoverer instance;
+
+ private Configuration conf = null;
+
+ private AbstractFpgaVendorPlugin plugin = null;
+
+ private List<FpgaResourceAllocator.FpgaDevice> currentFpgaInfo = null;
+
+ // shell command timeout
+ private static final int MAX_EXEC_TIMEOUT_MS = 10 * 1000;
+
+ static {
+ instance = new FpgaDiscoverer();
+ }
+
+ public static FpgaDiscoverer getInstance() {
+ return instance;
+ }
+
+ @VisibleForTesting
+ public synchronized static FpgaDiscoverer setInstance(FpgaDiscoverer newInstance) {
+ instance = newInstance;
+ return instance;
+ }
+
+ @VisibleForTesting
+ public synchronized void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public List<FpgaResourceAllocator.FpgaDevice> getCurrentFpgaInfo() {
+ return currentFpgaInfo;
+ }
+
+ public synchronized void setResourceHanderPlugin(AbstractFpgaVendorPlugin plugin) {
+ this.plugin = plugin;
+ }
+
+ public synchronized boolean diagnose() {
+ return this.plugin.diagnose(MAX_EXEC_TIMEOUT_MS);
+ }
+
+ public synchronized void initialize(Configuration conf) throws YarnException {
+ this.conf = conf;
+ this.plugin.initPlugin(conf);
+ // Try to diagnose FPGA
+ LOG.info("Trying to diagnose FPGA information ...");
+ if (!diagnose()) {
+ LOG.warn("Failed to pass FPGA devices diagnose");
+ }
+ }
+
+ /**
+ * get avialable devices minor numbers from toolchain or static configuration
+ * */
+ public synchronized List<FpgaResourceAllocator.FpgaDevice> discover() throws ResourceHandlerException {
+ List<FpgaResourceAllocator.FpgaDevice> list;
+ String allowed = this.conf.get(YarnConfiguration.NM_FPGA_ALLOWED_DEVICES);
+ // whatever static or auto discover, we always needs
+ // the vendor plugin to discover. For instance, IntelFpgaOpenclPlugin need to
+ // setup a mapping of <major:minor> to <aliasDevName>
+ list = this.plugin.discover(MAX_EXEC_TIMEOUT_MS);
+ if (0 == list.size()) {
+ throw new ResourceHandlerException("No FPGA devices detected!");
+ }
+ currentFpgaInfo = list;
+ if (allowed.equalsIgnoreCase(
+ YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES)) {
+ return list;
+ } else if (allowed.matches("(\\d,)*\\d")){
+ String[] minors = allowed.split(",");
+ Iterator<FpgaResourceAllocator.FpgaDevice> iterator = list.iterator();
+ // remove the non-configured minor numbers
+ FpgaResourceAllocator.FpgaDevice t;
+ while (iterator.hasNext()) {
+ boolean valid = false;
+ t = iterator.next();
+ for (String minorNumber : minors) {
+ if (t.getMinor().toString().equals(minorNumber)) {
+ valid = true;
+ break;
+ }
+ }
+ if (!valid) {
+ iterator.remove();
+ }
+ }
+ // if the count of user configured is still larger than actual
+ if (list.size() != minors.length) {
+ LOG.warn("We continue although there're mistakes in user's configuration " +
+ YarnConfiguration.NM_FPGA_ALLOWED_DEVICES +
+ "user configured:" + allowed + ", while the real:" + list.toString());
+ }
+ } else {
+ throw new ResourceHandlerException("Invalid value configured for " +
+ YarnConfiguration.NM_FPGA_ALLOWED_DEVICES + ":\"" + allowed + "\"");
+ }
+ return list;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java
new file mode 100644
index 0000000..7511d8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaNodeResourceUpdateHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
+
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
+
+public class FpgaNodeResourceUpdateHandler extends NodeResourceUpdaterPlugin {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ FpgaNodeResourceUpdateHandler.class);
+
+ @Override
+ public void updateConfiguredResource(Resource res) throws YarnException {
+ LOG.info("Initializing configured FPGA resources for the NodeManager.");
+ List<FpgaResourceAllocator.FpgaDevice> list = FpgaDiscoverer.getInstance().getCurrentFpgaInfo();
+ List<Integer> minors = new LinkedList<>();
+ for (FpgaResourceAllocator.FpgaDevice device : list) {
+ minors.add(device.getMinor());
+ }
+ if (minors.isEmpty()) {
+ LOG.info("Didn't find any usable FPGAs on the NodeManager.");
+ return;
+ }
+ long count = minors.size();
+
+ Map<String, ResourceInformation> configuredResourceTypes =
+ ResourceUtils.getResourceTypes();
+ if (!configuredResourceTypes.containsKey(FPGA_URI)) {
+ throw new YarnException("Wrong configurations, found " + count +
+ " usable FPGAs, however " + FPGA_URI
+ + " resource-type is not configured inside"
+ + " resource-types.xml, please configure it to enable FPGA feature or"
+ + " remove " + FPGA_URI + " from "
+ + YarnConfiguration.NM_RESOURCE_PLUGINS);
+ }
+
+ res.setResourceValue(FPGA_URI, count);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java
new file mode 100644
index 0000000..44d093e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/FpgaResourcePlugin.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceHandlerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
+
+public class FpgaResourcePlugin implements ResourcePlugin {
+ private static final Log LOG = LogFactory.getLog(FpgaResourcePlugin.class);
+
+ private ResourceHandler fpgaResourceHandler = null;
+
+ private AbstractFpgaVendorPlugin vendorPlugin = null;
+ private FpgaNodeResourceUpdateHandler fpgaNodeResourceUpdateHandler = null;
+
+ private AbstractFpgaVendorPlugin createFpgaVendorPlugin(Configuration conf) {
+ String vendorPluginClass = conf.get(YarnConfiguration.NM_FPGA_VENDOR_PLUGIN,
+ YarnConfiguration.DEFAULT_NM_FPGA_VENDOR_PLUGIN);
+ LOG.info("Using FPGA vendor plugin: " + vendorPluginClass);
+ try {
+ Class<?> schedulerClazz = Class.forName(vendorPluginClass);
+ if (AbstractFpgaVendorPlugin.class.isAssignableFrom(schedulerClazz)) {
+ return (AbstractFpgaVendorPlugin) ReflectionUtils.newInstance(schedulerClazz,
+ conf);
+ } else {
+ throw new YarnRuntimeException("Class: " + vendorPluginClass
+ + " not instance of " + AbstractFpgaVendorPlugin.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException("Could not instantiate FPGA vendor plugin: "
+ + vendorPluginClass, e);
+ }
+ }
+
+ @Override
+ public void initialize(Context context) throws YarnException {
+ // Get vendor plugin from configuration
+ this.vendorPlugin = createFpgaVendorPlugin(context.getConf());
+ FpgaDiscoverer.getInstance().setResourceHanderPlugin(vendorPlugin);
+ FpgaDiscoverer.getInstance().initialize(context.getConf());
+ fpgaNodeResourceUpdateHandler = new FpgaNodeResourceUpdateHandler();
+ }
+
+ @Override
+ public ResourceHandler createResourceHandler(
+ Context nmContext, CGroupsHandler cGroupsHandler,
+ PrivilegedOperationExecutor privilegedOperationExecutor) {
+ if (fpgaResourceHandler == null) {
+ fpgaResourceHandler = new FpgaResourceHandlerImpl(nmContext,
+ cGroupsHandler, privilegedOperationExecutor, vendorPlugin);
+ }
+ return fpgaResourceHandler;
+ }
+
+ @Override
+ public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() {
+ return fpgaNodeResourceUpdateHandler;
+ }
+
+ @Override
+ public void cleanup() throws YarnException {
+
+ }
+
+ @Override
+ public DockerCommandPlugin getDockerCommandPluginInstance() {
+ return null;
+ }
+
+ @Override
+ public NMResourceInfo getNMResourceInfo() throws YarnException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7225ec0c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java
new file mode 100644
index 0000000..f2e82b8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/fpga/IntelFpgaOpenclPlugin.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.fpga.FpgaResourceAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Intel FPGA for OpenCL plugin.
+ * The key points are:
+ * 1. It uses Intel's toolchain "aocl" to discover devices/reprogram IP to the device
+ * before container launch to achieve a quickest reprogramming path
+ * 2. It avoids reprogramming by maintaining a mapping of device to FPGA IP ID
+ * 3. It assume IP file is distributed to container directory
+ */
+public class IntelFpgaOpenclPlugin implements AbstractFpgaVendorPlugin {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ IntelFpgaOpenclPlugin.class);
+
+ private boolean initialized = false;
+ private Configuration conf;
+ private InnerShellExecutor shell;
+
+ protected static final String DEFAULT_BINARY_NAME = "aocl";
+
+ protected static final String ALTERAOCLSDKROOT_NAME = "ALTERAOCLSDKROOT";
+
+ private String pathToExecutable = null;
+
+ // a mapping of major:minor number to acl0-31
+ private Map<String, String> aliasMap;
+
+ public IntelFpgaOpenclPlugin() {
+ this.shell = new InnerShellExecutor();
+ }
+
+ public String getDefaultBinaryName() {
+ return DEFAULT_BINARY_NAME;
+ }
+
+ public String getDefaultPathToExecutable() {
+ return System.getenv(ALTERAOCLSDKROOT_NAME);
+ }
+
+ public static String getDefaultPathEnvName() {
+ return ALTERAOCLSDKROOT_NAME;
+ }
+
+ @VisibleForTesting
+ public String getPathToExecutable() {
+ return pathToExecutable;
+ }
+
+ public void setPathToExecutable(String pathToExecutable) {
+ this.pathToExecutable = pathToExecutable;
+ }
+
+ @VisibleForTesting
+ public void setShell(InnerShellExecutor shell) {
+ this.shell = shell;
+ }
+
+ public Map<String, String> getAliasMap() {
+ return aliasMap;
+ }
+
+ /**
+ * Check the Intel FPGA for OpenCL toolchain
+ * */
+ @Override
+ public boolean initPlugin(Configuration conf) {
+ this.aliasMap = new HashMap<>();
+ if (this.initialized) {
+ return true;
+ }
+ // Find the proper toolchain, mainly aocl
+ String pluginDefaultBinaryName = getDefaultBinaryName();
+ String pathToExecutable = conf.get(YarnConfiguration.NM_FPGA_PATH_TO_EXEC,
+ "");
+ if (pathToExecutable.isEmpty()) {
+ pathToExecutable = pluginDefaultBinaryName;
+ }
+ // Validate file existence
+ File binaryPath = new File(pathToExecutable);
+ if (!binaryPath.exists()) {
+ // When binary not exist, fail
+ LOG.warn("Failed to find FPGA discoverer executable configured in " +
+ YarnConfiguration.NM_FPGA_PATH_TO_EXEC +
+ ", please check! Try default path");
+ pathToExecutable = pluginDefaultBinaryName;
+ // Try to find in plugin's preferred path
+ String pluginDefaultPreferredPath = getDefaultPathToExecutable();
+ if (null == pluginDefaultPreferredPath) {
+ LOG.warn("Failed to find FPGA discoverer executable from system environment " +
+ getDefaultPathEnvName()+
+ ", please check your environment!");
+ } else {
+ binaryPath = new File(pluginDefaultPreferredPath + "/bin", pluginDefaultBinaryName);
+ if (binaryPath.exists()) {
+ pathToExecutable = pluginDefaultPreferredPath;
+ } else {
+ pathToExecutable = pluginDefaultBinaryName;
+ LOG.warn("Failed to find FPGA discoverer executable in " +
+ pluginDefaultPreferredPath + ", file doesn't exists! Use default binary" + pathToExecutable);
+ }
+ }
+ }
+ setPathToExecutable(pathToExecutable);
+ if (!diagnose(10*1000)) {
+ LOG.warn("Intel FPGA for OpenCL diagnose failed!");
+ this.initialized = false;
+ } else {
+ this.initialized = true;
+ }
+ return this.initialized;
+ }
+
+ @Override
+ public List<FpgaResourceAllocator.FpgaDevice> discover(int timeout) {
+ List<FpgaResourceAllocator.FpgaDevice> list = new LinkedList<>();
+ String output;
+ output = getDiagnoseInfo(timeout);
+ if (null == output) {
+ return list;
+ }
+ parseDiagnoseInfo(output, list);
+ return list;
+ }
+
+ public static class InnerShellExecutor {
+
+ // ls /dev/<devName>
+ // return a string in format <major:minor>
+ public String getMajorAndMinorNumber(String devName) {
+ String output = null;
+ Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+ new String[]{"stat", "-c", "%t:%T", "/dev/" + devName});
+ try {
+ LOG.debug("Get FPGA major-minor numbers from /dev/" + devName);
+ shexec.execute();
+ String[] strs = shexec.getOutput().trim().split(":");
+ LOG.debug("stat output:" + shexec.getOutput());
+ output = Integer.parseInt(strs[0], 16) + ":" + Integer.parseInt(strs[1], 16);
+ } catch (IOException e) {
+ String msg =
+ "Failed to get major-minor number from reading /dev/" + devName;
+ LOG.warn(msg);
+ LOG.debug("Command output:" + shexec.getOutput() + ", exit code:" +
+ shexec.getExitCode());
+ }
+ return output;
+ }
+
+ public String runDiagnose(String binary, int timeout) {
+ String output = null;
+ Shell.ShellCommandExecutor shexec = new Shell.ShellCommandExecutor(
+ new String[]{binary, "diagnose"});
+ try {
+ shexec.execute();
+ } catch (IOException e) {
+ // aocl diagnose exit code is 1 even it success.
+ // we ignore it because we only wants the output
+ String msg =
+ "Failed to execute " + binary + " diagnose, exception message:" + e
+ .getMessage() +", output:" + output + ", continue ...";
+ LOG.warn(msg);
+ LOG.debug(shexec.getOutput());
+ }
+ return shexec.getOutput();
+ }
+
+ }
+
+ /**
+ * One real sample output of Intel FPGA SDK 17.0's "aocl diagnose" is as below:
+ * "
+ * aocl diagnose: Running diagnose from /home/fpga/intelFPGA_pro/17.0/hld/board/nalla_pcie/linux64/libexec
+ *
+ * ------------------------- acl0 -------------------------
+ * Vendor: Nallatech ltd
+ *
+ * Phys Dev Name Status Information
+ *
+ * aclnalla_pcie0Passed nalla_pcie (aclnalla_pcie0)
+ * PCIe dev_id = 2494, bus:slot.func = 02:00.00, Gen3 x8
+ * FPGA temperature = 54.4 degrees C.
+ * Total Card Power Usage = 31.7 Watts.
+ * Device Power Usage = 0.0 Watts.
+ *
+ * DIAGNOSTIC_PASSED
+ * ---------------------------------------------------------
+ * "
+ *
+ * While per Intel's guide, the output(should be outdated or prior SDK version's) is as below:
+ *
+ * "
+ * aocl diagnose: Running diagnostic from ALTERAOCLSDKROOT/board/<board_name>/
+ * <platform>/libexec
+ * Verified that the kernel mode driver is installed on the host machine.
+ * Using board package from vendor: <board_vendor_name>
+ * Querying information for all supported devices that are installed on the host
+ * machine ...
+ *
+ * device_name Status Information
+ *
+ * acl0 Passed <descriptive_board_name>
+ * PCIe dev_id = <device_ID>, bus:slot.func = 02:00.00,
+ * at Gen 2 with 8 lanes.
+ * FPGA temperature=43.0 degrees C.
+ * acl1 Passed <descriptive_board_name>
+ * PCIe dev_id = <device_ID>, bus:slot.func = 03:00.00,
+ * at Gen 2 with 8 lanes.
+ * FPGA temperature = 35.0 degrees C.
+ *
+ * Found 2 active device(s) installed on the host machine, to perform a full
+ * diagnostic on a specific device, please run aocl diagnose <device_name>
+ *
+ * DIAGNOSTIC_PASSED
+ * "
+ * But this method only support the first output
+ * */
+ public void parseDiagnoseInfo(String output, List<FpgaResourceAllocator.FpgaDevice> list) {
+ if (output.contains("DIAGNOSTIC_PASSED")) {
+ Matcher headerStartMatcher = Pattern.compile("acl[0-31]").matcher(output);
+ Matcher headerEndMatcher = Pattern.compile("(?i)DIAGNOSTIC_PASSED").matcher(output);
+ int sectionStartIndex;
+ int sectionEndIndex;
+ String aliasName;
+ while (headerStartMatcher.find()) {
+ sectionStartIndex = headerStartMatcher.end();
+ String section = null;
+ aliasName = headerStartMatcher.group();
+ while (headerEndMatcher.find(sectionStartIndex)) {
+ sectionEndIndex = headerEndMatcher.start();
+ section = output.substring(sectionStartIndex, sectionEndIndex);
+ break;
+ }
+ if (null == section) {
+ LOG.warn("Unsupported diagnose output");
+ return;
+ }
+ // devName, \(.*\)
+ // busNum, bus:slot.func\s=\s.*,
+ // FPGA temperature\s=\s.*
+ // Total\sCard\sPower\sUsage\s=\s.*
+ String[] fieldRegexes = new String[]{"\\(.*\\)\n", "(?i)bus:slot.func\\s=\\s.*,",
+ "(?i)FPGA temperature\\s=\\s.*", "(?i)Total\\sCard\\sPower\\sUsage\\s=\\s.*"};
+ String[] fields = new String[4];
+ String tempFieldValue;
+ for (int i = 0; i < fieldRegexes.length; i++) {
+ Matcher fieldMatcher = Pattern.compile(fieldRegexes[i]).matcher(section);
+ if (!fieldMatcher.find()) {
+ LOG.warn("Couldn't find " + fieldRegexes[i] + " pattern");
+ fields[i] = "";
+ continue;
+ }
+ tempFieldValue = fieldMatcher.group().trim();
+ if (i == 0) {
+ // special case for Device name
+ fields[i] = tempFieldValue.substring(1, tempFieldValue.length() - 1);
+ } else {
+ String ss = tempFieldValue.split("=")[1].trim();
+ fields[i] = ss.substring(0, ss.length() - 1);
+ }
+ }
+ String majorMinorNumber = this.shell.getMajorAndMinorNumber(fields[0]);
+ if (null != majorMinorNumber) {
+ String[] mmn = majorMinorNumber.split(":");
+ this.aliasMap.put(majorMinorNumber, aliasName);
+ list.add(new FpgaResourceAllocator.FpgaDevice(getFpgaType(),
+ Integer.parseInt(mmn[0]),
+ Integer.parseInt(mmn[1]), null,
+ fields[0], aliasName, fields[1], fields[2], fields[3]));
+ }
+ }// end while
+ }// end if
+ }
+
+ public String getDiagnoseInfo(int timeout) {
+ return this.shell.runDiagnose(this.pathToExecutable,timeout);
+ }
+
+ @Override
+ public boolean diagnose(int timeout) {
+ String output = getDiagnoseInfo(timeout);
+ if (null != output && output.contains("DIAGNOSTIC_PASSED")) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * this is actually the opencl platform type
+ * */
+ @Override
+ public String getFpgaType() {
+ return "IntelOpenCL";
+ }
+
+ @Override
+ public String downloadIP(String id, String dstDir, Map<Path, List<String>> localizedResources) {
+ // Assume .aocx IP file is distributed by DS to local dir
+ String r = "";
+ Path path;
+ LOG.info("Got environment: " + id + ", search IP file in localized resources");
+ if (null == id || id.isEmpty()) {
+ LOG.warn("IP_ID environment is empty, skip downloading");
+ return r;
+ }
+ if (localizedResources != null) {
+ for (Map.Entry<Path, List<String>> resourceEntry :
+ localizedResources.entrySet()) {
+ path = resourceEntry.getKey();
+ LOG.debug("Check:" + path.toUri().toString());
+ if (path.getName().toLowerCase().contains(id.toLowerCase()) && path.getName().endsWith(".aocx")) {
+ r = path.toUri().toString();
+ LOG.debug("Found: " + r);
+ break;
+ }
+ }
+ } else {
+ LOG.warn("Localized resource is null!");
+ }
+ return r;
+ }
+
+ /**
+ * Program one device.
+ * It's ok for the offline "aocl program" failed because the application will always invoke API to program
+ * The reason we do offline reprogramming is to make the application's program process faster
+ * @param ipPath the absolute path to the aocx IP file
+ * @param majorMinorNumber major:minor string
+ * @return True or False
+ * */
+ @Override
+ public boolean configureIP(String ipPath, String majorMinorNumber) {
+ // perform offline program the IP to get a quickest reprogramming sequence
+ // we need a mapping of "major:minor" to "acl0" to issue command "aocl program <acl0> <ipPath>"
+ Shell.ShellCommandExecutor shexec;
+ String aclName;
+ aclName = this.aliasMap.get(majorMinorNumber);
+ shexec = new Shell.ShellCommandExecutor(
+ new String[]{this.pathToExecutable, "program", aclName, ipPath});
+ try {
+ shexec.execute();
+ if (0 == shexec.getExitCode()) {
+ LOG.debug(shexec.getOutput());
+ LOG.info("Intel aocl program " + ipPath + " to " + aclName + " successfully");
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ LOG.error("Intel aocl program " + ipPath + " to " + aclName + " failed!");
+ e.printStackTrace();
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org