You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2018/05/22 20:14:56 UTC
[28/50] [abbrv] hadoop git commit: YARN-7530. Refactored YARN service
API project location. Contributed by Chandni Singh
YARN-7530. Refactored YARN service API project location.
Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a23ff8d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a23ff8d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a23ff8d8
Branch: refs/heads/HDDS-48
Commit: a23ff8d88001ad8e4ac4c36fc1f7691d193dc1d0
Parents: 89f5911
Author: Eric Yang <ey...@apache.org>
Authored: Fri May 18 17:29:10 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Fri May 18 17:29:10 2018 -0400
----------------------------------------------------------------------
.../resources/assemblies/hadoop-yarn-dist.xml | 2 +-
.../dev-support/findbugs-exclude.xml | 20 -
.../hadoop-yarn-services-api/pom.xml | 144 ----
.../yarn/service/client/ApiServiceClient.java | 598 --------------
.../client/SystemServiceManagerImpl.java | 391 ---------
.../yarn/service/client/package-info.java | 28 -
.../hadoop/yarn/service/webapp/ApiServer.java | 818 -------------------
.../yarn/service/webapp/ApiServerWebApp.java | 161 ----
.../yarn/service/webapp/package-info.java | 28 -
.../definition/YARN-Services-Examples.md | 444 ----------
...RN-Simplified-V1-API-Layer-For-Services.yaml | 594 --------------
.../src/main/resources/log4j-server.properties | 76 --
.../src/main/resources/webapps/api-server/app | 16 -
.../src/main/webapp/WEB-INF/web.xml | 36 -
.../hadoop/yarn/service/ServiceClientTest.java | 210 -----
.../hadoop/yarn/service/TestApiServer.java | 623 --------------
.../service/client/TestApiServiceClient.java | 314 -------
.../client/TestSystemServiceManagerImpl.java | 182 -----
.../src/test/resources/example-app.json | 16 -
.../src/test/resources/log4j.properties | 19 -
.../resources/system-services/bad/bad.yarnfile | 16 -
.../sync/user1/example-app1.yarnfile | 16 -
.../sync/user1/example-app2.yarnfile | 16 -
.../sync/user1/example-app3.json | 16 -
.../sync/user2/example-app1.yarnfile | 16 -
.../sync/user2/example-app2.yarnfile | 16 -
.../dev-support/findbugs-exclude.xml | 20 +
.../hadoop-yarn-services-api/pom.xml | 144 ++++
.../yarn/service/client/ApiServiceClient.java | 598 ++++++++++++++
.../client/SystemServiceManagerImpl.java | 391 +++++++++
.../yarn/service/client/package-info.java | 28 +
.../hadoop/yarn/service/webapp/ApiServer.java | 818 +++++++++++++++++++
.../yarn/service/webapp/ApiServerWebApp.java | 161 ++++
.../yarn/service/webapp/package-info.java | 28 +
.../definition/YARN-Services-Examples.md | 444 ++++++++++
...RN-Simplified-V1-API-Layer-For-Services.yaml | 594 ++++++++++++++
.../src/main/resources/log4j-server.properties | 76 ++
.../src/main/resources/webapps/api-server/app | 16 +
.../src/main/webapp/WEB-INF/web.xml | 36 +
.../hadoop/yarn/service/ServiceClientTest.java | 210 +++++
.../hadoop/yarn/service/TestApiServer.java | 623 ++++++++++++++
.../service/client/TestApiServiceClient.java | 314 +++++++
.../client/TestSystemServiceManagerImpl.java | 182 +++++
.../src/test/resources/example-app.json | 16 +
.../src/test/resources/log4j.properties | 19 +
.../resources/system-services/bad/bad.yarnfile | 16 +
.../sync/user1/example-app1.yarnfile | 16 +
.../sync/user1/example-app2.yarnfile | 16 +
.../sync/user1/example-app3.json | 16 +
.../sync/user2/example-app1.yarnfile | 16 +
.../sync/user2/example-app2.yarnfile | 16 +
.../hadoop-yarn-services/pom.xml | 1 +
.../hadoop-yarn-applications/pom.xml | 1 -
53 files changed, 4816 insertions(+), 4816 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
----------------------------------------------------------------------
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
index 382c967..a2ea08c 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
@@ -105,7 +105,7 @@
</includes>
</fileSet>
<fileSet>
- <directory>hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/target</directory>
+ <directory>hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/target</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
<includes>
<include>*-sources.jar</include>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml
deleted file mode 100644
index b89146a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/dev-support/findbugs-exclude.xml
+++ /dev/null
@@ -1,20 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
deleted file mode 100644
index 354c9b5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
+++ /dev/null
@@ -1,144 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-applications</artifactId>
- <version>3.2.0-SNAPSHOT</version>
- </parent>
- <artifactId>hadoop-yarn-services-api</artifactId>
- <name>Apache Hadoop YARN Services API</name>
- <packaging>jar</packaging>
- <description>Hadoop YARN REST APIs for services</description>
-
- <build>
-
- <!-- resources are filtered for dynamic updates. This gets build info in-->
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- <filtering>true</filtering>
- </resource>
- <resource>
- <directory>src/main/scripts/</directory>
- <filtering>true</filtering>
- </resource>
- </resources>
-
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <!-- The configuration of the plugin -->
- <configuration>
- <!-- Configuration of the archiver -->
- <archive>
- <manifestEntries>
- <mode>development</mode>
- <url>${project.url}</url>
- </manifestEntries>
- <!-- Manifest specific configuration -->
- <manifest>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**/*.json</exclude>
- <exclude>**/*.yarnfile</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
- <reporting>
- </reporting>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-services-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-webapp</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- </dependency>
- <dependency>
- <groupId>javax.ws.rs</groupId>
- <artifactId>jsr311-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- ======================================================== -->
- <!-- Test dependencies -->
- <!-- ======================================================== -->
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
deleted file mode 100644
index a8e2f51..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ /dev/null
@@ -1,598 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.service.client;
-
-import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.List;
-import java.util.Map;
-
-import javax.ws.rs.core.MediaType;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.client.api.AppAdminClient;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.api.records.Component;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
-import org.apache.hadoop.yarn.service.api.records.Container;
-import org.apache.hadoop.yarn.service.api.records.ContainerState;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.api.records.ServiceState;
-import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
-import org.apache.hadoop.yarn.service.conf.RestApiConstants;
-import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.util.RMHAUtils;
-import org.codehaus.jackson.map.PropertyNamingStrategy;
-import org.eclipse.jetty.util.UrlEncoded;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.WebResource.Builder;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-
-import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
-
-/**
- * The rest API client for users to manage services on YARN.
- */
-public class ApiServiceClient extends AppAdminClient {
- private static final Logger LOG =
- LoggerFactory.getLogger(ApiServiceClient.class);
- protected YarnClient yarnClient;
-
- @Override protected void serviceInit(Configuration configuration)
- throws Exception {
- yarnClient = YarnClient.createYarnClient();
- addService(yarnClient);
- super.serviceInit(configuration);
- }
-
- /**
- * Calculate Resource Manager address base on working REST API.
- */
- private String getRMWebAddress() {
- Configuration conf = getConfig();
- String scheme = "http://";
- String path = "/app/v1/services/version";
- String rmAddress = conf
- .get("yarn.resourcemanager.webapp.address");
- if (YarnConfiguration.useHttps(conf)) {
- scheme = "https://";
- rmAddress = conf
- .get("yarn.resourcemanager.webapp.https.address");
- }
- boolean useKerberos = UserGroupInformation.isSecurityEnabled();
- List<String> rmServers = RMHAUtils
- .getRMHAWebappAddresses(new YarnConfiguration(conf));
- for (String host : rmServers) {
- try {
- Client client = Client.create();
- StringBuilder sb = new StringBuilder();
- sb.append(scheme);
- sb.append(host);
- sb.append(path);
- if (!useKerberos) {
- try {
- String username = UserGroupInformation.getCurrentUser().getShortUserName();
- sb.append("?user.name=");
- sb.append(username);
- } catch (IOException e) {
- LOG.debug("Fail to resolve username: {}", e);
- }
- }
- WebResource webResource = client
- .resource(sb.toString());
- if (useKerberos) {
- AuthenticatedURL.Token token = new AuthenticatedURL.Token();
- webResource.header("WWW-Authenticate", token);
- }
- ClientResponse test = webResource.get(ClientResponse.class);
- if (test.getStatus() == 200) {
- rmAddress = host;
- break;
- }
- } catch (Exception e) {
- LOG.debug("Fail to connect to: "+host, e);
- }
- }
- return scheme+rmAddress;
- }
-
- /**
- * Compute active resource manager API service location.
- *
- * @param appName - YARN service name
- * @return URI to API Service
- * @throws IOException
- */
- private String getServicePath(String appName) throws IOException {
- String url = getRMWebAddress();
- StringBuilder api = new StringBuilder();
- api.append(url);
- api.append("/app/v1/services");
- if (appName != null) {
- api.append("/");
- api.append(appName);
- }
- Configuration conf = getConfig();
- if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) {
- api.append("?user.name=" + UrlEncoded
- .encodeString(System.getProperty("user.name")));
- }
- return api.toString();
- }
-
- private String getInstancesPath(String appName) throws IOException {
- Preconditions.checkNotNull(appName);
- String url = getRMWebAddress();
- StringBuilder api = new StringBuilder();
- api.append(url);
- api.append("/app/v1/services/").append(appName).append("/")
- .append(RestApiConstants.COMP_INSTANCES);
- Configuration conf = getConfig();
- if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
- "simple")) {
- api.append("?user.name=" + UrlEncoded
- .encodeString(System.getProperty("user.name")));
- }
- return api.toString();
- }
-
- private String getComponentsPath(String appName) throws IOException {
- Preconditions.checkNotNull(appName);
- String url = getRMWebAddress();
- StringBuilder api = new StringBuilder();
- api.append(url);
- api.append("/app/v1/services/").append(appName).append("/")
- .append(RestApiConstants.COMPONENTS);
- Configuration conf = getConfig();
- if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
- "simple")) {
- api.append("?user.name=" + UrlEncoded
- .encodeString(System.getProperty("user.name")));
- }
- return api.toString();
- }
-
- private Builder getApiClient() throws IOException {
- return getApiClient(getServicePath(null));
- }
-
- /**
- * Setup API service web request.
- *
- * @param requestPath
- * @return
- * @throws IOException
- */
- private Builder getApiClient(String requestPath)
- throws IOException {
- Client client = Client.create(getClientConfig());
- Configuration conf = getConfig();
- client.setChunkedEncodingSize(null);
- Builder builder = client
- .resource(requestPath).type(MediaType.APPLICATION_JSON);
- if (conf.get("hadoop.http.authentication.type").equals("kerberos")) {
- AuthenticatedURL.Token token = new AuthenticatedURL.Token();
- builder.header("WWW-Authenticate", token);
- }
- return builder
- .accept("application/json;charset=utf-8");
- }
-
- private ClientConfig getClientConfig() {
- ClientConfig config = new DefaultClientConfig();
- config.getProperties().put(
- ClientConfig.PROPERTY_CHUNKED_ENCODING_SIZE, 0);
- config.getProperties().put(
- ClientConfig.PROPERTY_BUFFER_RESPONSE_ENTITY_ON_EXCEPTION, true);
- return config;
- }
-
- private int processResponse(ClientResponse response) {
- response.bufferEntity();
- String output;
- if (response.getStatus() == 401) {
- LOG.error("Authentication required");
- return EXIT_EXCEPTION_THROWN;
- }
- if (response.getStatus() == 503) {
- LOG.error("YARN Service is unavailable or disabled.");
- return EXIT_EXCEPTION_THROWN;
- }
- try {
- ServiceStatus ss = response.getEntity(ServiceStatus.class);
- output = ss.getDiagnostics();
- } catch (Throwable t) {
- output = response.getEntity(String.class);
- }
- if (output==null) {
- output = response.getEntity(String.class);
- }
- if (response.getStatus() <= 299) {
- LOG.info(output);
- return EXIT_SUCCESS;
- } else {
- LOG.error(output);
- return EXIT_EXCEPTION_THROWN;
- }
- }
-
- /**
- * Utility method to load Service json from disk or from
- * YARN examples.
- *
- * @param fileName - path to yarnfile
- * @param serviceName - YARN Service Name
- * @param lifetime - application lifetime
- * @param queue - Queue to submit application
- * @return
- * @throws IOException
- * @throws YarnException
- */
- public Service loadAppJsonFromLocalFS(String fileName, String serviceName,
- Long lifetime, String queue) throws IOException, YarnException {
- File file = new File(fileName);
- if (!file.exists() && fileName.equals(file.getName())) {
- String examplesDirStr = System.getenv("YARN_SERVICE_EXAMPLES_DIR");
- String[] examplesDirs;
- if (examplesDirStr == null) {
- String yarnHome = System
- .getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
- examplesDirs = new String[]{
- yarnHome + "/share/hadoop/yarn/yarn-service-examples",
- yarnHome + "/yarn-service-examples"
- };
- } else {
- examplesDirs = StringUtils.split(examplesDirStr, ":");
- }
- for (String dir : examplesDirs) {
- file = new File(MessageFormat.format("{0}/{1}/{2}.json",
- dir, fileName, fileName));
- if (file.exists()) {
- break;
- }
- // Then look for secondary location.
- file = new File(MessageFormat.format("{0}/{1}.json",
- dir, fileName));
- if (file.exists()) {
- break;
- }
- }
- }
- if (!file.exists()) {
- throw new YarnException("File or example could not be found: " +
- fileName);
- }
- Path filePath = new Path(file.getAbsolutePath());
- LOG.info("Loading service definition from local FS: " + filePath);
- Service service = jsonSerDeser
- .load(FileSystem.getLocal(getConfig()), filePath);
- if (!StringUtils.isEmpty(serviceName)) {
- service.setName(serviceName);
- }
- if (lifetime != null && lifetime > 0) {
- service.setLifetime(lifetime);
- }
- if (!StringUtils.isEmpty(queue)) {
- service.setQueue(queue);
- }
- return service;
- }
-
- /**
- * Launch YARN service application.
- *
- * @param fileName - path to yarnfile
- * @param appName - YARN Service Name
- * @param lifetime - application lifetime
- * @param queue - Queue to submit application
- */
- @Override
- public int actionLaunch(String fileName, String appName, Long lifetime,
- String queue) throws IOException, YarnException {
- int result = EXIT_SUCCESS;
- try {
- Service service =
- loadAppJsonFromLocalFS(fileName, appName, lifetime, queue);
- String buffer = jsonSerDeser.toJson(service);
- ClientResponse response = getApiClient()
- .post(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Fail to launch application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- /**
- * Stop YARN service application.
- *
- * @param appName - YARN Service Name
- */
- @Override
- public int actionStop(String appName) throws IOException, YarnException {
- int result = EXIT_SUCCESS;
- try {
- Service service = new Service();
- service.setName(appName);
- service.setState(ServiceState.STOPPED);
- String buffer = jsonSerDeser.toJson(service);
- ClientResponse response = getApiClient(getServicePath(appName))
- .put(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Fail to stop application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- /**
- * Start YARN service application.
- *
- * @param appName - YARN Service Name
- */
- @Override
- public int actionStart(String appName) throws IOException, YarnException {
- int result = EXIT_SUCCESS;
- try {
- Service service = new Service();
- service.setName(appName);
- service.setState(ServiceState.STARTED);
- String buffer = jsonSerDeser.toJson(service);
- ClientResponse response = getApiClient(getServicePath(appName))
- .put(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Fail to start application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- /**
- * Save Service configuration.
- *
- * @param fileName - path to Yarnfile
- * @param appName - YARN Service Name
- * @param lifetime - container life time
- * @param queue - Queue to submit the application
- */
- @Override
- public int actionSave(String fileName, String appName, Long lifetime,
- String queue) throws IOException, YarnException {
- int result = EXIT_SUCCESS;
- try {
- Service service =
- loadAppJsonFromLocalFS(fileName, appName, lifetime, queue);
- service.setState(ServiceState.STOPPED);
- String buffer = jsonSerDeser.toJson(service);
- ClientResponse response = getApiClient()
- .post(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Fail to save application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- /**
- * Decommission a YARN service.
- *
- * @param appName - YARN Service Name
- */
- @Override
- public int actionDestroy(String appName) throws IOException, YarnException {
- int result = EXIT_SUCCESS;
- try {
- ClientResponse response = getApiClient(getServicePath(appName))
- .delete(ClientResponse.class);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Fail to destroy application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- /**
- * Change number of containers associated with a service.
- *
- * @param appName - YARN Service Name
- * @param componentCounts - list of components and desired container count
- */
- @Override
- public int actionFlex(String appName, Map<String, String> componentCounts)
- throws IOException, YarnException {
- int result = EXIT_SUCCESS;
- try {
- Service service = new Service();
- service.setName(appName);
- service.setState(ServiceState.FLEX);
- for (Map.Entry<String, String> entry : componentCounts.entrySet()) {
- Component component = new Component();
- component.setName(entry.getKey());
- Long numberOfContainers = Long.parseLong(entry.getValue());
- component.setNumberOfContainers(numberOfContainers);
- service.addComponent(component);
- }
- String buffer = jsonSerDeser.toJson(service);
- ClientResponse response = getApiClient(getServicePath(appName))
- .put(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Fail to flex application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- @Override
- public int enableFastLaunch(String destinationFolder) throws IOException, YarnException {
- ServiceClient sc = new ServiceClient();
- sc.init(getConfig());
- sc.start();
- int result = sc.enableFastLaunch(destinationFolder);
- sc.close();
- return result;
- }
-
- /**
- * Retrieve Service Status through REST API.
- *
- * @param appIdOrName - YARN application ID or application name
- * @return Status output
- */
- @Override
- public String getStatusString(String appIdOrName) throws IOException,
- YarnException {
- String output = "";
- String appName;
- try {
- ApplicationId appId = ApplicationId.fromString(appIdOrName);
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
- appName = appReport.getName();
- } catch (IllegalArgumentException e) {
- // not app Id format, may be app name
- appName = appIdOrName;
- ServiceApiUtil.validateNameFormat(appName, getConfig());
- }
- try {
- ClientResponse response = getApiClient(getServicePath(appName))
- .get(ClientResponse.class);
- if (response.getStatus() == 404) {
- StringBuilder sb = new StringBuilder();
- sb.append(" Service ");
- sb.append(appName);
- sb.append(" not found");
- return sb.toString();
- }
- if (response.getStatus() != 200) {
- StringBuilder sb = new StringBuilder();
- sb.append(appName);
- sb.append(" Failed : HTTP error code : ");
- sb.append(response.getStatus());
- return sb.toString();
- }
- output = response.getEntity(String.class);
- } catch (Exception e) {
- LOG.error("Fail to check application status: ", e);
- }
- return output;
- }
-
- @Override
- public int initiateUpgrade(String appName,
- String fileName, boolean autoFinalize) throws IOException, YarnException {
- int result;
- try {
- Service service =
- loadAppJsonFromLocalFS(fileName, appName, null, null);
- if (autoFinalize) {
- service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
- } else {
- service.setState(ServiceState.UPGRADING);
- }
- String buffer = jsonSerDeser.toJson(service);
- ClientResponse response = getApiClient(getServicePath(appName))
- .put(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Failed to upgrade application: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- @Override
- public int actionUpgradeInstances(String appName, List<String> compInstances)
- throws IOException, YarnException {
- int result;
- Container[] toUpgrade = new Container[compInstances.size()];
- try {
- int idx = 0;
- for (String instanceName : compInstances) {
- Container container = new Container();
- container.setComponentInstanceName(instanceName);
- container.setState(ContainerState.UPGRADING);
- toUpgrade[idx++] = container;
- }
- String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade);
- ClientResponse response = getApiClient(getInstancesPath(appName))
- .put(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Failed to upgrade component instance: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- @Override
- public int actionUpgradeComponents(String appName, List<String> components)
- throws IOException, YarnException {
- int result;
- Component[] toUpgrade = new Component[components.size()];
- try {
- int idx = 0;
- for (String compName : components) {
- Component component = new Component();
- component.setName(compName);
- component.setState(ComponentState.UPGRADING);
- toUpgrade[idx++] = component;
- }
- String buffer = COMP_JSON_SERDE.toJson(toUpgrade);
- ClientResponse response = getApiClient(getComponentsPath(appName))
- .put(ClientResponse.class, buffer);
- result = processResponse(response);
- } catch (Exception e) {
- LOG.error("Failed to upgrade components: ", e);
- result = EXIT_EXCEPTION_THROWN;
- }
- return result;
- }
-
- private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
- new JsonSerDeser<>(Container[].class,
- PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
-
- private static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
- new JsonSerDeser<>(Component[].class,
- PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
deleted file mode 100644
index f9cfa92..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.service.client;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.service.SystemServiceManager;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.api.records.ServiceState;
-import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
-
-/**
- * SystemServiceManager implementation.
- * Scan for configure system service path.
- *
- * The service path structure is as follows:
- * SYSTEM_SERVICE_DIR_PATH
- * |---- sync
- * | |--- user1
- * | | |---- service1.yarnfile
- * | | |---- service2.yarnfile
- * | |--- user2
- * | | |---- service1.yarnfile
- * | | ....
- * | |
- * |---- async
- * | |--- user3
- * | | |---- service1.yarnfile
- * | | |---- service2.yarnfile
- * | |--- user4
- * | | |---- service1.yarnfile
- * | | ....
- * | |
- *
- * sync: These services are launched at the time of service start synchronously.
- * It is a blocking service start.
- * async: These services are launched in separate thread without any delay after
- * service start. Non-blocking service start.
- */
-public class SystemServiceManagerImpl extends AbstractService
- implements SystemServiceManager {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(SystemServiceManagerImpl.class);
-
- private static final String YARN_FILE_SUFFIX = ".yarnfile";
- private static final String SYNC = "sync";
- private static final String ASYNC = "async";
-
- private FileSystem fs;
- private Path systemServiceDir;
- private AtomicBoolean stopExecutors = new AtomicBoolean(false);
- private Map<String, Set<Service>> syncUserServices = new HashMap<>();
- private Map<String, Set<Service>> asyncUserServices = new HashMap<>();
- private UserGroupInformation loginUGI;
- private Thread serviceLaucher;
-
- @VisibleForTesting
- private int badFileNameExtensionSkipCounter;
- @VisibleForTesting
- private Map<String, Integer> ignoredUserServices =
- new HashMap<>();
- @VisibleForTesting
- private int badDirSkipCounter;
-
- public SystemServiceManagerImpl() {
- super(SystemServiceManagerImpl.class.getName());
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- String dirPath =
- conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY);
- if (dirPath != null) {
- systemServiceDir = new Path(dirPath);
- LOG.info("System Service Directory is configured to {}",
- systemServiceDir);
- fs = systemServiceDir.getFileSystem(conf);
- this.loginUGI = UserGroupInformation.isSecurityEnabled() ?
- UserGroupInformation.getLoginUser() :
- UserGroupInformation.getCurrentUser();
- LOG.info("UserGroupInformation initialized to {}", loginUGI);
- }
- }
-
- @Override
- protected void serviceStart() throws Exception {
- scanForUserServices();
- launchUserService(syncUserServices);
- // Create a thread and submit services in background otherwise it
- // block RM switch time.
- serviceLaucher = new Thread(createRunnable());
- serviceLaucher.setName("System service launcher");
- serviceLaucher.start();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- LOG.info("Stopping {}", getName());
- stopExecutors.set(true);
-
- if (serviceLaucher != null) {
- serviceLaucher.interrupt();
- try {
- serviceLaucher.join();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted Exception while stopping", ie);
- }
- }
- }
-
- private Runnable createRunnable() {
- return new Runnable() {
- @Override
- public void run() {
- launchUserService(asyncUserServices);
- }
- };
- }
-
- void launchUserService(Map<String, Set<Service>> userServices) {
- for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) {
- String user = entry.getKey();
- Set<Service> services = entry.getValue();
- if (services.isEmpty()) {
- continue;
- }
- ServiceClient serviceClient = null;
- try {
- UserGroupInformation userUgi = getProxyUser(user);
- serviceClient = createServiceClient(userUgi);
- for (Service service : services) {
- LOG.info("POST: createService = {} user = {}", service, userUgi);
- try {
- launchServices(userUgi, serviceClient, service);
- } catch (IOException | UndeclaredThrowableException e) {
- if (e.getCause() != null) {
- LOG.warn(e.getCause().getMessage());
- } else {
- String message =
- "Failed to create service " + service.getName() + " : ";
- LOG.error(message, e);
- }
- }
- }
- } catch (InterruptedException e) {
- LOG.warn("System service launcher thread interrupted", e);
- break;
- } catch (Exception e) {
- LOG.error("Error while submitting services for user " + user, e);
- } finally {
- if (serviceClient != null) {
- try {
- serviceClient.close();
- } catch (IOException e) {
- LOG.warn("Error while closing serviceClient for user {}", user);
- }
- }
- }
- }
- }
-
- private ServiceClient createServiceClient(UserGroupInformation userUgi)
- throws IOException, InterruptedException {
- ServiceClient serviceClient =
- userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() {
- @Override public ServiceClient run()
- throws IOException, YarnException {
- ServiceClient sc = getServiceClient();
- sc.init(getConfig());
- sc.start();
- return sc;
- }
- });
- return serviceClient;
- }
-
- private void launchServices(UserGroupInformation userUgi,
- ServiceClient serviceClient, Service service)
- throws IOException, InterruptedException {
- if (service.getState() == ServiceState.STOPPED) {
- userUgi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override public Void run() throws IOException, YarnException {
- serviceClient.actionBuild(service);
- return null;
- }
- });
- LOG.info("Service {} version {} saved.", service.getName(),
- service.getVersion());
- } else {
- ApplicationId applicationId =
- userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
- @Override public ApplicationId run()
- throws IOException, YarnException {
- ApplicationId applicationId = serviceClient.actionCreate(service);
- return applicationId;
- }
- });
- LOG.info("Service {} submitted with Application ID: {}",
- service.getName(), applicationId);
- }
- }
-
- ServiceClient getServiceClient() {
- return new ServiceClient();
- }
-
- private UserGroupInformation getProxyUser(String user) {
- UserGroupInformation ugi;
- if (UserGroupInformation.isSecurityEnabled()) {
- ugi = UserGroupInformation.createProxyUser(user, loginUGI);
- } else {
- ugi = UserGroupInformation.createRemoteUser(user);
- }
- return ugi;
- }
-
- // scan for both launch service types i.e sync and async
- void scanForUserServices() throws IOException {
- if (systemServiceDir == null) {
- return;
- }
- try {
- LOG.info("Scan for launch type on {}", systemServiceDir);
- RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir);
- while (iterLaunchType.hasNext()) {
- FileStatus launchType = iterLaunchType.next();
- if (!launchType.isDirectory()) {
- LOG.debug("Scanner skips for unknown file {}", launchType.getPath());
- continue;
- }
- if (launchType.getPath().getName().equals(SYNC)) {
- scanForUserServiceDefinition(launchType.getPath(), syncUserServices);
- } else if (launchType.getPath().getName().equals(ASYNC)) {
- scanForUserServiceDefinition(launchType.getPath(), asyncUserServices);
- } else {
- badDirSkipCounter++;
- LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
- }
- }
- } catch (FileNotFoundException e) {
- LOG.warn("System service directory {} doesn't not exist.",
- systemServiceDir);
- }
- }
-
- // Files are under systemServiceDir/<users>. Scan for 2 levels
- // 1st level for users
- // 2nd level for service definitions under user
- private void scanForUserServiceDefinition(Path userDirPath,
- Map<String, Set<Service>> userServices) throws IOException {
- LOG.info("Scan for users on {}", userDirPath);
- RemoteIterator<FileStatus> iterUsers = list(userDirPath);
- while (iterUsers.hasNext()) {
- FileStatus userDir = iterUsers.next();
- // if 1st level is not user directory then skip it.
- if (!userDir.isDirectory()) {
- LOG.info(
- "Service definition {} doesn't belong to any user. Ignoring.. ",
- userDir.getPath().getName());
- continue;
- }
- String userName = userDir.getPath().getName();
- LOG.info("Scanning service definitions for user {}.", userName);
-
- //2nd level scan
- RemoteIterator<FileStatus> iterServices = list(userDir.getPath());
- while (iterServices.hasNext()) {
- FileStatus serviceCache = iterServices.next();
- String filename = serviceCache.getPath().getName();
- if (!serviceCache.isFile()) {
- LOG.info("Scanner skips for unknown dir {}", filename);
- continue;
- }
- if (!filename.endsWith(YARN_FILE_SUFFIX)) {
- LOG.info("Scanner skips for unknown file extension, filename = {}",
- filename);
- badFileNameExtensionSkipCounter++;
- continue;
- }
- Service service = getServiceDefinition(serviceCache.getPath());
- if (service != null) {
- Set<Service> services = userServices.get(userName);
- if (services == null) {
- services = new HashSet<>();
- userServices.put(userName, services);
- }
- if (!services.add(service)) {
- int count = ignoredUserServices.containsKey(userName) ?
- ignoredUserServices.get(userName) : 0;
- ignoredUserServices.put(userName, count + 1);
- LOG.warn(
- "Ignoring service {} for the user {} as it is already present,"
- + " filename = {}", service.getName(), userName, filename);
- } else {
- LOG.info("Added service {} for the user {}, filename = {}",
- service.getName(), userName, filename);
- }
- }
- }
- }
- }
-
- private Service getServiceDefinition(Path filePath) {
- Service service = null;
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading service definition from FS: " + filePath);
- }
- service = jsonSerDeser.load(fs, filePath);
- } catch (IOException e) {
- LOG.info("Error while loading service definition from FS: {}", e);
- }
- return service;
- }
-
- private RemoteIterator<FileStatus> list(Path path) throws IOException {
- return new StoppableRemoteIterator(fs.listStatusIterator(path));
- }
-
- @VisibleForTesting Map<String, Integer> getIgnoredUserServices() {
- return ignoredUserServices;
- }
-
- private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
- private final RemoteIterator<FileStatus> remote;
-
- StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
- this.remote = remote;
- }
-
- @Override public boolean hasNext() throws IOException {
- return !stopExecutors.get() && remote.hasNext();
- }
-
- @Override public FileStatus next() throws IOException {
- return remote.next();
- }
- }
-
- @VisibleForTesting
- Map<String, Set<Service>> getSyncUserServices() {
- return syncUserServices;
- }
-
- @VisibleForTesting
- int getBadFileNameExtensionSkipCounter() {
- return badFileNameExtensionSkipCounter;
- }
-
- @VisibleForTesting
- int getBadDirSkipCounter() {
- return badDirSkipCounter;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java
deleted file mode 100644
index cf5ce11..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.yarn.service.client contains classes
- * for YARN Services Client API.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.service.client;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a23ff8d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
deleted file mode 100644
index 46c9abe..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ /dev/null
@@ -1,818 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.service.webapp;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.api.records.Component;
-import org.apache.hadoop.yarn.service.api.records.ComponentState;
-import org.apache.hadoop.yarn.service.api.records.Container;
-import org.apache.hadoop.yarn.service.api.records.ContainerState;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.api.records.ServiceState;
-import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
-import org.apache.hadoop.yarn.service.client.ServiceClient;
-import org.apache.hadoop.yarn.service.conf.RestApiConstants;
-import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
-import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
-import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*;
-
-/**
- * The rest API endpoints for users to manage services on YARN.
- */
-@Singleton
-@Path(CONTEXT_ROOT)
-public class ApiServer {
-
- public ApiServer() {
- super();
- }
-
- @Inject
- public ApiServer(Configuration conf) {
- super();
- }
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ApiServer.class);
- private static Configuration YARN_CONFIG = new YarnConfiguration();
- private ServiceClient serviceClientUnitTest;
- private boolean unitTest = false;
-
- static {
- init();
- }
-
- // initialize all the common resources - order is important
- private static void init() {
- }
-
- @GET
- @Path(VERSION)
- @Consumes({ MediaType.APPLICATION_JSON })
- @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" })
- public Response getVersion() {
- String version = VersionInfo.getBuildVersion();
- LOG.info(version);
- return Response.ok("{ \"hadoop_version\": \"" + version + "\"}").build();
- }
-
- @POST
- @Path(SERVICE_ROOT_PATH)
- @Consumes({ MediaType.APPLICATION_JSON })
- @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" })
- public Response createService(@Context HttpServletRequest request,
- Service service) {
- ServiceStatus serviceStatus = new ServiceStatus();
- try {
- UserGroupInformation ugi = getProxyUser(request);
- LOG.info("POST: createService = {} user = {}", service, ugi);
- if(service.getState()==ServiceState.STOPPED) {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws YarnException, IOException {
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- sc.actionBuild(service);
- sc.close();
- return null;
- }
- });
- serviceStatus.setDiagnostics("Service " + service.getName() +
- " version " + service.getVersion() + " saved.");
- } else {
- ApplicationId applicationId = ugi
- .doAs(new PrivilegedExceptionAction<ApplicationId>() {
- @Override
- public ApplicationId run() throws IOException, YarnException {
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- ApplicationId applicationId = sc.actionCreate(service);
- sc.close();
- return applicationId;
- }
- });
- serviceStatus.setDiagnostics("Application ID: " + applicationId);
- }
- serviceStatus.setState(ACCEPTED);
- serviceStatus.setUri(
- CONTEXT_ROOT + SERVICE_ROOT_PATH + "/" + service
- .getName());
- return formatResponse(Status.ACCEPTED, serviceStatus);
- } catch (AccessControlException e) {
- serviceStatus.setDiagnostics(e.getMessage());
- return formatResponse(Status.FORBIDDEN, e.getCause().getMessage());
- } catch (IllegalArgumentException e) {
- return formatResponse(Status.BAD_REQUEST, e.getMessage());
- } catch (IOException | InterruptedException e) {
- String message = "Failed to create service " + service.getName()
- + ": {}";
- LOG.error(message, e);
- return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- } catch (UndeclaredThrowableException e) {
- String message = "Failed to create service " + service.getName()
- + ": {}";
- LOG.error(message, e);
- if (e.getCause().getMessage().contains("already exists")) {
- message = "Service name " + service.getName() + " is already taken.";
- } else {
- message = e.getCause().getMessage();
- }
- return formatResponse(Status.INTERNAL_SERVER_ERROR,
- message);
- }
- }
-
- @GET
- @Path(SERVICE_PATH)
- @Consumes({ MediaType.APPLICATION_JSON })
- @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" })
- public Response getService(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String appName) {
- ServiceStatus serviceStatus = new ServiceStatus();
- try {
- if (appName == null) {
- throw new IllegalArgumentException("Service name cannot be null.");
- }
- UserGroupInformation ugi = getProxyUser(request);
- LOG.info("GET: getService for appName = {} user = {}", appName, ugi);
- Service app = getServiceFromClient(ugi, appName);
- return Response.ok(app).build();
- } catch (AccessControlException e) {
- return formatResponse(Status.FORBIDDEN, e.getMessage());
- } catch (IllegalArgumentException e) {
- serviceStatus.setDiagnostics(e.getMessage());
- serviceStatus.setCode(ERROR_CODE_APP_NAME_INVALID);
- return Response.status(Status.NOT_FOUND).entity(serviceStatus)
- .build();
- } catch (FileNotFoundException e) {
- serviceStatus.setDiagnostics("Service " + appName + " not found");
- serviceStatus.setCode(ERROR_CODE_APP_NAME_INVALID);
- return Response.status(Status.NOT_FOUND).entity(serviceStatus)
- .build();
- } catch (IOException | InterruptedException e) {
- LOG.error("Get service failed: {}", e);
- return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- } catch (UndeclaredThrowableException e) {
- LOG.error("Get service failed: {}", e);
- return formatResponse(Status.INTERNAL_SERVER_ERROR,
- e.getCause().getMessage());
- }
- }
-
- @DELETE
- @Path(SERVICE_PATH)
- @Consumes({ MediaType.APPLICATION_JSON })
- @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" })
- public Response deleteService(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String appName) {
- try {
- if (appName == null) {
- throw new IllegalArgumentException("Service name can not be null.");
- }
- UserGroupInformation ugi = getProxyUser(request);
- LOG.info("DELETE: deleteService for appName = {} user = {}",
- appName, ugi);
- return stopService(appName, true, ugi);
- } catch (AccessControlException e) {
- return formatResponse(Status.FORBIDDEN, e.getMessage());
- } catch (IllegalArgumentException e) {
- return formatResponse(Status.BAD_REQUEST, e.getMessage());
- } catch (UndeclaredThrowableException e) {
- LOG.error("Fail to stop service: {}", e);
- return formatResponse(Status.BAD_REQUEST,
- e.getCause().getMessage());
- } catch (YarnException | FileNotFoundException e) {
- return formatResponse(Status.NOT_FOUND, e.getMessage());
- } catch (Exception e) {
- LOG.error("Fail to stop service: {}", e);
- return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- }
-
- private Response stopService(String appName, boolean destroy,
- final UserGroupInformation ugi) throws Exception {
- int result = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
- @Override
- public Integer run() throws Exception {
- int result = 0;
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- Exception stopException = null;
- try {
- result = sc.actionStop(appName, destroy);
- if (result == EXIT_SUCCESS) {
- LOG.info("Successfully stopped service {}", appName);
- }
- } catch (Exception e) {
- LOG.info("Got exception stopping service", e);
- stopException = e;
- }
- if (destroy) {
- result = sc.actionDestroy(appName);
- if (result == EXIT_SUCCESS) {
- LOG.info("Successfully deleted service {}", appName);
- }
- } else {
- if (stopException != null) {
- throw stopException;
- }
- }
- sc.close();
- return result;
- }
- });
- ServiceStatus serviceStatus = new ServiceStatus();
- if (destroy) {
- if (result == EXIT_SUCCESS) {
- serviceStatus.setDiagnostics("Successfully destroyed service " +
- appName);
- } else {
- if (result == EXIT_NOT_FOUND) {
- serviceStatus
- .setDiagnostics("Service " + appName + " doesn't exist");
- return formatResponse(Status.BAD_REQUEST, serviceStatus);
- } else {
- serviceStatus
- .setDiagnostics("Service " + appName + " error cleaning up " +
- "registry");
- return formatResponse(Status.INTERNAL_SERVER_ERROR, serviceStatus);
- }
- }
- } else {
- if (result == EXIT_COMMAND_ARGUMENT_ERROR) {
- serviceStatus
- .setDiagnostics("Service " + appName + " is already stopped");
- return formatResponse(Status.BAD_REQUEST, serviceStatus);
- } else {
- serviceStatus.setDiagnostics("Successfully stopped service " + appName);
- }
- }
- return formatResponse(Status.OK, serviceStatus);
- }
-
- @PUT
- @Path(COMPONENTS_PATH)
- @Consumes({MediaType.APPLICATION_JSON})
- @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
- public Response updateComponents(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String serviceName,
- List<Component> requestComponents) {
-
- try {
- if (requestComponents == null || requestComponents.isEmpty()) {
- throw new YarnException("No components provided.");
- }
- UserGroupInformation ugi = getProxyUser(request);
- Set<String> compNamesToUpgrade = new HashSet<>();
- requestComponents.forEach(reqComp -> {
- if (reqComp.getState() != null &&
- reqComp.getState().equals(ComponentState.UPGRADING)) {
- compNamesToUpgrade.add(reqComp.getName());
- }
- });
- LOG.info("PUT: upgrade components {} for service {} " +
- "user = {}", compNamesToUpgrade, serviceName, ugi);
- return processComponentsUpgrade(ugi, serviceName, compNamesToUpgrade);
- } catch (AccessControlException e) {
- return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
- } catch (YarnException e) {
- return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
- } catch (IOException | InterruptedException e) {
- return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
- e.getMessage());
- } catch (UndeclaredThrowableException e) {
- return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
- e.getCause().getMessage());
- }
- }
-
- @PUT
- @Path(COMPONENT_PATH)
- @Consumes({ MediaType.APPLICATION_JSON })
- @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8",
- MediaType.TEXT_PLAIN })
- public Response updateComponent(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String appName,
- @PathParam(COMPONENT_NAME) String componentName, Component component) {
-
- try {
- if (component == null) {
- throw new YarnException("No component data provided");
- }
- if (component.getName() != null
- && !component.getName().equals(componentName)) {
- String msg = "Component name in the request object ("
- + component.getName() + ") does not match that in the URI path ("
- + componentName + ")";
- throw new YarnException(msg);
- }
- UserGroupInformation ugi = getProxyUser(request);
- if (component.getState() != null &&
- component.getState().equals(ComponentState.UPGRADING)) {
- LOG.info("PUT: upgrade component {} for service {} " +
- "user = {}", component.getName(), appName, ugi);
- return processComponentsUpgrade(ugi, appName,
- Sets.newHashSet(componentName));
- }
-
- if (component.getNumberOfContainers() == null) {
- throw new YarnException("No container count provided");
- }
- if (component.getNumberOfContainers() < 0) {
- String message = "Invalid number of containers specified "
- + component.getNumberOfContainers();
- throw new YarnException(message);
- }
- Map<String, Long> original = ugi
- .doAs(new PrivilegedExceptionAction<Map<String, Long>>() {
- @Override
- public Map<String, Long> run() throws YarnException, IOException {
- ServiceClient sc = new ServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- Map<String, Long> original = sc.flexByRestService(appName,
- Collections.singletonMap(componentName,
- component.getNumberOfContainers()));
- sc.close();
- return original;
- }
- });
- ServiceStatus status = new ServiceStatus();
- status.setDiagnostics(
- "Updating component (" + componentName + ") size from " + original
- .get(componentName) + " to " + component.getNumberOfContainers());
- return formatResponse(Status.OK, status);
- } catch (AccessControlException e) {
- return formatResponse(Status.FORBIDDEN, e.getMessage());
- } catch (YarnException e) {
- return formatResponse(Status.BAD_REQUEST, e.getMessage());
- } catch (IOException | InterruptedException e) {
- return formatResponse(Status.INTERNAL_SERVER_ERROR,
- e.getMessage());
- } catch (UndeclaredThrowableException e) {
- return formatResponse(Status.INTERNAL_SERVER_ERROR,
- e.getCause().getMessage());
- }
- }
-
- @PUT
- @Path(SERVICE_PATH)
- @Consumes({ MediaType.APPLICATION_JSON })
- @Produces({ MediaType.APPLICATION_JSON + ";charset=utf-8" })
- public Response updateService(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String appName,
- Service updateServiceData) {
- try {
- UserGroupInformation ugi = getProxyUser(request);
- LOG.info("PUT: updateService for app = {} with data = {} user = {}",
- appName, updateServiceData, ugi);
- // Ignore the app name provided in updateServiceData and always use
- // appName path param
- updateServiceData.setName(appName);
-
- if (updateServiceData.getState() != null
- && updateServiceData.getState() == ServiceState.FLEX) {
- return flexService(updateServiceData, ugi);
- }
- // For STOP the app should be running. If already stopped then this
- // operation will be a no-op. For START it should be in stopped state.
- // If already running then this operation will be a no-op.
- if (updateServiceData.getState() != null
- && updateServiceData.getState() == ServiceState.STOPPED) {
- return stopService(appName, false, ugi);
- }
-
- // If a START is requested
- if (updateServiceData.getState() != null
- && updateServiceData.getState() == ServiceState.STARTED) {
- return startService(appName, ugi);
- }
-
- // If an UPGRADE is requested
- if (updateServiceData.getState() != null && (
- updateServiceData.getState() == ServiceState.UPGRADING ||
- updateServiceData.getState() ==
- ServiceState.UPGRADING_AUTO_FINALIZE)) {
- return upgradeService(updateServiceData, ugi);
- }
-
- // If new lifetime value specified then update it
- if (updateServiceData.getLifetime() != null
- && updateServiceData.getLifetime() > 0) {
- return updateLifetime(appName, updateServiceData, ugi);
- }
- } catch (UndeclaredThrowableException e) {
- return formatResponse(Status.BAD_REQUEST,
- e.getCause().getMessage());
- } catch (AccessControlException e) {
- return formatResponse(Status.FORBIDDEN, e.getMessage());
- } catch (FileNotFoundException e) {
- String message = "Application is not found app: " + appName;
- LOG.error(message, e);
- return formatResponse(Status.NOT_FOUND, e.getMessage());
- } catch (YarnException e) {
- String message = "Service is not found in hdfs: " + appName;
- LOG.error(message, e);
- return formatResponse(Status.NOT_FOUND, e.getMessage());
- } catch (Exception e) {
- String message = "Error while performing operation for app: " + appName;
- LOG.error(message, e);
- return formatResponse(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- // If nothing happens consider it a no-op
- return Response.status(Status.NO_CONTENT).build();
- }
-
- @PUT
- @Path(COMP_INSTANCE_LONG_PATH)
- @Consumes({MediaType.APPLICATION_JSON})
- @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
- public Response updateComponentInstance(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String serviceName,
- @PathParam(COMPONENT_NAME) String componentName,
- @PathParam(COMP_INSTANCE_NAME) String compInstanceName,
- Container reqContainer) {
-
- try {
- UserGroupInformation ugi = getProxyUser(request);
- LOG.info("PUT: update component instance {} for component = {}" +
- " service = {} user = {}", compInstanceName, componentName,
- serviceName, ugi);
- if (reqContainer == null) {
- throw new YarnException("No container data provided.");
- }
- Service service = getServiceFromClient(ugi, serviceName);
- Component component = service.getComponent(componentName);
- if (component == null) {
- throw new YarnException(String.format(
- "The component name in the URI path (%s) is invalid.",
- componentName));
- }
-
- Container liveContainer = component.getComponentInstance(
- compInstanceName);
- if (liveContainer == null) {
- throw new YarnException(String.format(
- "The component (%s) does not have a component instance (%s).",
- componentName, compInstanceName));
- }
-
- if (reqContainer.getState() != null
- && reqContainer.getState().equals(ContainerState.UPGRADING)) {
- return processContainersUpgrade(ugi, service,
- Lists.newArrayList(liveContainer));
- }
- } catch (AccessControlException e) {
- return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
- } catch (YarnException e) {
- return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
- } catch (IOException | InterruptedException e) {
- return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
- e.getMessage());
- } catch (UndeclaredThrowableException e) {
- return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
- e.getCause().getMessage());
- }
- return Response.status(Status.NO_CONTENT).build();
- }
-
- @PUT
- @Path(COMP_INSTANCES_PATH)
- @Consumes({MediaType.APPLICATION_JSON})
- @Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
- public Response updateComponentInstances(@Context HttpServletRequest request,
- @PathParam(SERVICE_NAME) String serviceName,
- List<Container> requestContainers) {
-
- try {
- if (requestContainers == null || requestContainers.isEmpty()) {
- throw new YarnException("No containers provided.");
- }
- UserGroupInformation ugi = getProxyUser(request);
- List<String> toUpgrade = new ArrayList<>();
- for (Container reqContainer : requestContainers) {
- if (reqContainer.getState() != null &&
- reqContainer.getState().equals(ContainerState.UPGRADING)) {
- toUpgrade.add(reqContainer.getComponentInstanceName());
- }
- }
-
- if (!toUpgrade.isEmpty()) {
- Service service = getServiceFromClient(ugi, serviceName);
- LOG.info("PUT: upgrade component instances {} for service = {} " +
- "user = {}", toUpgrade, serviceName, ugi);
- List<Container> liveContainers = ServiceApiUtil
- .getLiveContainers(service, toUpgrade);
-
- return processContainersUpgrade(ugi, service, liveContainers);
- }
- } catch (AccessControlException e) {
- return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
- } catch (YarnException e) {
- return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
- } catch (IOException | InterruptedException e) {
- return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
- e.getMessage());
- } catch (UndeclaredThrowableException e) {
- return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
- e.getCause().getMessage());
- }
- return Response.status(Status.NO_CONTENT).build();
- }
-
- private Response flexService(Service service, UserGroupInformation ugi)
- throws IOException, InterruptedException {
- String appName = service.getName();
- Response response = Response.status(Status.BAD_REQUEST).build();
- Map<String, String> componentCountStrings = new HashMap<String, String>();
- for (Component c : service.getComponents()) {
- componentCountStrings.put(c.getName(),
- c.getNumberOfContainers().toString());
- }
- Integer result = ugi.doAs(new PrivilegedExceptionAction<Integer>() {
-
- @Override
- public Integer run() throws YarnException, IOException {
- int result = 0;
- ServiceClient sc = new ServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- result = sc
- .actionFlex(appName, componentCountStrings);
- sc.close();
- return Integer.valueOf(result);
- }
- });
- if (result == EXIT_SUCCESS) {
- String message = "Service " + appName + " is successfully flexed.";
- LOG.info(message);
- ServiceStatus status = new ServiceStatus();
- status.setDiagnostics(message);
- status.setState(ServiceState.ACCEPTED);
- response = formatResponse(Status.ACCEPTED, status);
- }
- return response;
- }
-
- private Response updateLifetime(String appName, Service updateAppData,
- final UserGroupInformation ugi) throws IOException,
- InterruptedException {
- String newLifeTime = ugi.doAs(new PrivilegedExceptionAction<String>() {
- @Override
- public String run() throws YarnException, IOException {
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- String newLifeTime = sc.updateLifetime(appName,
- updateAppData.getLifetime());
- sc.close();
- return newLifeTime;
- }
- });
- ServiceStatus status = new ServiceStatus();
- status.setDiagnostics(
- "Service (" + appName + ")'s lifeTime is updated to " + newLifeTime
- + ", " + updateAppData.getLifetime() + " seconds remaining");
- return formatResponse(Status.OK, status);
- }
-
- private Response startService(String appName,
- final UserGroupInformation ugi) throws IOException,
- InterruptedException {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- @Override
- public Void run() throws YarnException, IOException {
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- sc.actionStart(appName);
- sc.close();
- return null;
- }
- });
- LOG.info("Successfully started service " + appName);
- ServiceStatus status = new ServiceStatus();
- status.setDiagnostics("Service " + appName + " is successfully started.");
- status.setState(ServiceState.ACCEPTED);
- return formatResponse(Status.OK, status);
- }
-
- private Response upgradeService(Service service,
- final UserGroupInformation ugi) throws IOException, InterruptedException {
- ServiceStatus status = new ServiceStatus();
- ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- sc.initiateUpgrade(service);
- sc.close();
- return null;
- });
- LOG.info("Service {} version {} upgrade initialized", service.getName(),
- service.getVersion());
- status.setDiagnostics("Service " + service.getName() +
- " version " + service.getVersion() + " saved.");
- status.setState(ServiceState.ACCEPTED);
- return formatResponse(Status.ACCEPTED, status);
- }
-
- private Response processComponentsUpgrade(UserGroupInformation ugi,
- String serviceName, Set<String> compNames) throws YarnException,
- IOException, InterruptedException {
- Service service = getServiceFromClient(ugi, serviceName);
- if (service.getState() != ServiceState.UPGRADING) {
- throw new YarnException(
- String.format("The upgrade of service %s has not been initiated.",
- service.getName()));
- }
- List<Container> containersToUpgrade = ServiceApiUtil
- .validateAndResolveCompsUpgrade(service, compNames);
- Integer result = invokeContainersUpgrade(ugi, service, containersToUpgrade);
- if (result == EXIT_SUCCESS) {
- ServiceStatus status = new ServiceStatus();
- status.setDiagnostics(
- "Upgrading components " + Joiner.on(',').join(compNames) + ".");
- return formatResponse(Response.Status.ACCEPTED, status);
- }
- // If result is not a success, consider it a no-op
- return Response.status(Response.Status.NO_CONTENT).build();
- }
-
- private Response processContainersUpgrade(UserGroupInformation ugi,
- Service service, List<Container> containers) throws YarnException,
- IOException, InterruptedException {
-
- if (service.getState() != ServiceState.UPGRADING) {
- throw new YarnException(
- String.format("The upgrade of service %s has not been initiated.",
- service.getName()));
- }
- ServiceApiUtil.validateInstancesUpgrade(containers);
- Integer result = invokeContainersUpgrade(ugi, service, containers);
- if (result == EXIT_SUCCESS) {
- ServiceStatus status = new ServiceStatus();
- status.setDiagnostics(
- "Upgrading component instances " + containers.stream()
- .map(Container::getId).collect(Collectors.joining(",")) + ".");
- return formatResponse(Response.Status.ACCEPTED, status);
- }
- // If result is not a success, consider it a no-op
- return Response.status(Response.Status.NO_CONTENT).build();
- }
-
- private int invokeContainersUpgrade(UserGroupInformation ugi,
- Service service, List<Container> containers) throws IOException,
- InterruptedException {
- return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
- int result1;
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- result1 = sc.actionUpgrade(service, containers);
- sc.close();
- return result1;
- });
- }
-
- private Service getServiceFromClient(UserGroupInformation ugi,
- String serviceName) throws IOException, InterruptedException {
-
- return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
- ServiceClient sc = getServiceClient();
- sc.init(YARN_CONFIG);
- sc.start();
- Service app1 = sc.getStatus(serviceName);
- sc.close();
- return app1;
- });
- }
-
- /**
- * Used by negative test case.
- *
- * @param mockServerClient - A mocked version of ServiceClient
- */
- public void setServiceClient(ServiceClient mockServerClient) {
- serviceClientUnitTest = mockServerClient;
- unitTest = true;
- }
-
- private ServiceClient getServiceClient() {
- if (unitTest) {
- return serviceClientUnitTest;
- } else {
- return new ServiceClient();
- }
- }
-
- /**
- * Configure impersonation callback.
- *
- * @param request - web request
- * @return - configured UGI class for proxy callback
- * @throws IOException - if user is not login.
- */
- private UserGroupInformation getProxyUser(HttpServletRequest request)
- throws AccessControlException {
- UserGroupInformation proxyUser;
- UserGroupInformation ugi;
- String remoteUser = request.getRemoteUser();
- try {
- if (UserGroupInformation.isSecurityEnabled()) {
- proxyUser = UserGroupInformation.getLoginUser();
- ugi = UserGroupInformation.createProxyUser(remoteUser, proxyUser);
- } else {
- ugi = UserGroupInformation.createRemoteUser(remoteUser);
- }
- return ugi;
- } catch (IOException e) {
- throw new AccessControlException(e.getCause());
- }
- }
-
- /**
- * Format HTTP response.
- *
- * @param status - HTTP Code
- * @param message - Diagnostic message
- * @return - HTTP response
- */
- private Response formatResponse(Status status, String message) {
- ServiceStatus entity = new ServiceStatus();
- entity.setDiagnostics(message);
- return formatResponse(status, entity);
- }
-
- /**
- * Format HTTP response.
- *
- * @param status - HTTP Code
- * @param entity - ServiceStatus object
- * @return - HTTP response
- */
- private Response formatResponse(Status status, ServiceStatus entity) {
- return Response.status(status).entity(entity).build();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org