You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/10/11 20:37:21 UTC
[17/50] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java
new file mode 100644
index 0000000..dd4785d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/RpcBinder.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.rpc;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolProxy;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcEngine;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.common.SliderExitCodes;
+import org.apache.slider.common.tools.Duration;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.ErrorStrings;
+import org.apache.slider.core.exceptions.ServiceNotReadyException;
+import org.apache.slider.core.exceptions.SliderException;
+
+import static org.apache.slider.common.SliderXmlConfKeys.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+public class RpcBinder {
+ protected static final Logger log =
+ LoggerFactory.getLogger(RpcBinder.class);
+
+ /**
+ * Create a protobuf server bonded to the specific socket address
+ * @param addr address to listen to; 0.0.0.0 as hostname acceptable
+ * @param conf config
+ * @param secretManager token secret handler
+ * @param numHandlers threads to service requests
+ * @param blockingService service to handle
+ * @param portRangeConfig range of ports
+ * @return the IPC server itself
+ * @throws IOException
+ */
+ public static Server createProtobufServer(InetSocketAddress addr,
+ Configuration conf,
+ SecretManager<? extends TokenIdentifier> secretManager,
+ int numHandlers,
+ BlockingService blockingService,
+ String portRangeConfig) throws
+ IOException {
+ Class<SliderClusterProtocolPB> sliderClusterAPIClass = registerSliderAPI(
+ conf);
+ RPC.Server server = new RPC.Builder(conf).setProtocol(sliderClusterAPIClass)
+ .setInstance(blockingService)
+ .setBindAddress(addr.getAddress()
+ .getCanonicalHostName())
+ .setPort(addr.getPort())
+ .setNumHandlers(numHandlers)
+ .setVerbose(false)
+ .setSecretManager(secretManager)
+ .setPortRangeConfig(
+ portRangeConfig)
+ .build();
+ log.debug(
+ "Adding protocol " + sliderClusterAPIClass.getCanonicalName() + " to the server");
+ server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, sliderClusterAPIClass,
+ blockingService);
+ return server;
+ }
+
+ /**
+ * Add the protobuf engine to the configuration. Harmless and inexpensive
+ * if repeated.
+ * @param conf configuration to patch
+ * @return the protocol class
+ */
+ public static Class<SliderClusterProtocolPB> registerSliderAPI(
+ Configuration conf) {
+ Class<SliderClusterProtocolPB> sliderClusterAPIClass =
+ SliderClusterProtocolPB.class;
+ RPC.setProtocolEngine(conf, sliderClusterAPIClass, ProtobufRpcEngine.class);
+
+ //quick sanity check here
+ assert verifyBondedToProtobuf(conf, sliderClusterAPIClass);
+
+ return sliderClusterAPIClass;
+ }
+
+ /**
+ * Verify that the conf is set up for protobuf transport of Slider RPC
+ * @param conf configuration
+ * @param sliderClusterAPIClass class for the API
+ * @return true if the RPC engine is protocol buffers
+ */
+ public static boolean verifyBondedToProtobuf(Configuration conf,
+ Class<SliderClusterProtocolPB> sliderClusterAPIClass) {
+ return conf.getClass("rpc.engine." + sliderClusterAPIClass.getName(),
+ RpcEngine.class) .equals(ProtobufRpcEngine.class);
+ }
+
+
+ /**
+ * Connect to a server. May include setting up retry policies
+ * @param addr
+ * @param currentUser
+ * @param conf
+ * @param rpcTimeout
+ * @return
+ * @throws IOException
+ */
+ public static SliderClusterProtocol connectToServer(InetSocketAddress addr,
+ UserGroupInformation currentUser,
+ Configuration conf,
+ int rpcTimeout) throws IOException {
+ Class<SliderClusterProtocolPB> sliderClusterAPIClass =
+ registerSliderAPI(conf);
+
+ final RetryPolicy retryPolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+ log.debug("Connecting to Slider AM at {}", addr);
+ ProtocolProxy<SliderClusterProtocolPB> protoProxy =
+ RPC.getProtocolProxy(sliderClusterAPIClass,
+ 1,
+ addr,
+ currentUser,
+ conf,
+ NetUtils.getDefaultSocketFactory(conf),
+ rpcTimeout,
+ retryPolicy);
+ SliderClusterProtocolPB endpoint = protoProxy.getProxy();
+ return new SliderClusterProtocolProxy(endpoint, addr);
+ }
+
+
+ /**
+ * This loops for a limited period trying to get the Proxy -
+ * by doing so it handles AM failover
+ * @param conf configuration to patch and use
+ * @param rmClient client of the resource manager
+ * @param application application to work with
+ * @param connectTimeout timeout for the whole proxy operation to timeout
+ * (milliseconds). Use 0 to indicate "do not attempt to wait" -fail fast.
+ * @param rpcTimeout timeout for RPCs to block during communications
+ * @return the proxy
+ * @throws IOException IO problems
+ * @throws YarnException Slider-generated exceptions related to the binding
+ * failing. This can include the application finishing or timeouts
+ * @throws InterruptedException if a sleep operation waiting for
+ * the cluster to respond is interrupted.
+ */
+ @SuppressWarnings("NestedAssignment")
+ public static SliderClusterProtocol getProxy(final Configuration conf,
+ final ApplicationClientProtocol rmClient,
+ ApplicationReport application,
+ final int connectTimeout,
+ final int rpcTimeout)
+ throws IOException, YarnException, InterruptedException {
+ ApplicationId appId;
+ appId = application.getApplicationId();
+ Duration timeout = new Duration(connectTimeout);
+ timeout.start();
+ Exception exception = null;
+ YarnApplicationState state = null;
+ try {
+ while (application != null &&
+ (state = application.getYarnApplicationState()).equals(
+ YarnApplicationState.RUNNING)) {
+
+ try {
+ return getProxy(conf, application, rpcTimeout);
+ } catch (IOException e) {
+ if (connectTimeout <= 0 || timeout.getLimitExceeded()) {
+ throw e;
+ }
+ exception = e;
+ } catch (YarnException e) {
+ if (connectTimeout <= 0 || timeout.getLimitExceeded()) {
+ throw e;
+ }
+ exception = e;
+ }
+ //at this point: app failed to work
+ log.debug("Could not connect to {}. Waiting for getting the latest AM address...",
+ appId);
+ Thread.sleep(1000);
+ //or get the app report
+ application =
+ rmClient.getApplicationReport(
+ GetApplicationReportRequest.newInstance(appId)).getApplicationReport();
+ }
+ //get here if the app is no longer running. Raise a specific
+ //exception but init it with the previous failure
+ throw new BadClusterStateException(
+ exception,
+ ErrorStrings.E_FINISHED_APPLICATION, appId, state );
+ } finally {
+ timeout.close();
+ }
+ }
+
+ /**
+ * Get a proxy from the application report
+ * @param conf config to use
+ * @param application app report
+ * @param rpcTimeout timeout in RPC operations
+ * @return the proxy
+ * @throws IOException
+ * @throws SliderException
+ * @throws InterruptedException
+ */
+ public static SliderClusterProtocol getProxy(final Configuration conf,
+ final ApplicationReport application,
+ final int rpcTimeout)
+ throws IOException, SliderException, InterruptedException {
+
+ String host = application.getHost();
+ int port = application.getRpcPort();
+ org.apache.hadoop.yarn.api.records.Token clientToAMToken =
+ application.getClientToAMToken();
+ return createProxy(conf, host, port, clientToAMToken, rpcTimeout);
+ }
+
+ /**
+ *
+ * @param conf config to use
+ * @param host hosname
+ * @param port port
+ * @param clientToAMToken auth token: only used in a secure cluster.
+ * converted via {@link ConverterUtils#convertFromYarn(org.apache.hadoop.yarn.api.records.Token, InetSocketAddress)}
+ * @param rpcTimeout timeout in RPC operations
+ * @return the proxy
+ * @throws SliderException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static SliderClusterProtocol createProxy(final Configuration conf,
+ String host,
+ int port,
+ org.apache.hadoop.yarn.api.records.Token clientToAMToken,
+ final int rpcTimeout) throws
+ SliderException,
+ IOException,
+ InterruptedException {
+ String address = host + ":" + port;
+ if (SliderUtils.isUnset(host) || 0 == port) {
+ throw new SliderException(SliderExitCodes.EXIT_CONNECTIVITY_PROBLEM,
+ "Slider instance "
+ + " isn't providing a valid address for the" +
+ " Slider RPC protocol: " + address);
+ }
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ final UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+ currentUser.getUserName());
+ final InetSocketAddress serviceAddr =
+ NetUtils.createSocketAddrForHost(host, port);
+ SliderClusterProtocol realProxy;
+
+ log.debug("Connecting to {}", serviceAddr);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Preconditions.checkArgument(clientToAMToken != null,
+ "Null clientToAMToken");
+ Token<ClientToAMTokenIdentifier> token =
+ ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr);
+ newUgi.addToken(token);
+ realProxy =
+ newUgi.doAs(new PrivilegedExceptionAction<SliderClusterProtocol>() {
+ @Override
+ public SliderClusterProtocol run() throws IOException {
+ return connectToServer(serviceAddr, newUgi, conf, rpcTimeout);
+ }
+ });
+ } else {
+ realProxy = connectToServer(serviceAddr, newUgi, conf, rpcTimeout);
+ }
+ return realProxy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java
new file mode 100644
index 0000000..a40078a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderAMPolicyProvider.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.server.appmaster.rpc;
+
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+import org.apache.slider.common.SliderXmlConfKeys;
+
+/**
+ * {@link PolicyProvider} for Slider protocols.
+ */
+
+public class SliderAMPolicyProvider extends PolicyProvider {
+
+ private static final Service[] services =
+ new Service[] {
+ new Service(SliderXmlConfKeys.KEY_PROTOCOL_ACL, SliderClusterProtocolPB.class)
+ };
+
+ @SuppressWarnings("ReturnOfCollectionOrArrayField")
+ @Override
+ public Service[] getServices() {
+ return services;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java
new file mode 100644
index 0000000..7d237de
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPB.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.rpc;
+
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.SliderClusterAPI;
+
+public interface SliderClusterProtocolPB extends SliderClusterAPI.SliderClusterProtocolPB.BlockingInterface{
+
+ long versionID = SliderClusterProtocol.versionID;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
new file mode 100644
index 0000000..f0d9063
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.rpc;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.Messages;
+
+import java.io.IOException;
+
+/**
+ * Server-side Relay from Protobuf to internal RPC.
+ *
+ */
+public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB {
+
+ private SliderClusterProtocol real;
+
+ public SliderClusterProtocolPBImpl(SliderClusterProtocol real) {
+ this.real = real;
+ }
+
+ private ServiceException wrap(Exception e) {
+ if (e instanceof ServiceException) {
+ return (ServiceException) e;
+ }
+ return new ServiceException(e);
+ }
+
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return SliderClusterProtocol.versionID;
+ }
+
+ @Override
+ public Messages.StopClusterResponseProto stopCluster(RpcController controller,
+ Messages.StopClusterRequestProto request) throws ServiceException {
+ try {
+ return real.stopCluster(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.UpgradeContainersResponseProto upgradeContainers(RpcController controller,
+ Messages.UpgradeContainersRequestProto request) throws ServiceException {
+ try {
+ return real.upgradeContainers(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.FlexClusterResponseProto flexCluster(RpcController controller,
+ Messages.FlexClusterRequestProto request) throws ServiceException {
+ try {
+ return real.flexCluster(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus(
+ RpcController controller,
+ Messages.GetJSONClusterStatusRequestProto request) throws ServiceException {
+ try {
+ return real.getJSONClusterStatus(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+
+ @Override
+ public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
+ RpcController controller,
+ Messages.GetInstanceDefinitionRequestProto request)
+ throws ServiceException {
+ try {
+ return real.getInstanceDefinition(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(
+ RpcController controller,
+ Messages.ListNodeUUIDsByRoleRequestProto request) throws ServiceException {
+ try {
+ return real.listNodeUUIDsByRole(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetNodeResponseProto getNode(RpcController controller,
+ Messages.GetNodeRequestProto request) throws ServiceException {
+ try {
+ return real.getNode(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetClusterNodesResponseProto getClusterNodes(RpcController controller,
+ Messages.GetClusterNodesRequestProto request) throws ServiceException {
+ try {
+ return real.getClusterNodes(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.EchoResponseProto echo(RpcController controller,
+ Messages.EchoRequestProto request) throws ServiceException {
+ try {
+ return real.echo(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.KillContainerResponseProto killContainer(RpcController controller,
+ Messages.KillContainerRequestProto request) throws ServiceException {
+ try {
+ return real.killContainer(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.AMSuicideResponseProto amSuicide(RpcController controller,
+ Messages.AMSuicideRequestProto request) throws ServiceException {
+ try {
+ return real.amSuicide(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.ApplicationLivenessInformationProto getLivenessInformation(
+ RpcController controller,
+ Messages.GetApplicationLivenessRequestProto request) throws ServiceException {
+ try {
+ return real.getLivenessInformation(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveContainersResponseProto getLiveContainers(RpcController controller,
+ Messages.GetLiveContainersRequestProto request) throws ServiceException {
+ try {
+ return real.getLiveContainers(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.ContainerInformationProto getLiveContainer(RpcController controller,
+ Messages.GetLiveContainerRequestProto request) throws ServiceException {
+ try {
+ return real.getLiveContainer(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveComponentsResponseProto getLiveComponents(RpcController controller,
+ Messages.GetLiveComponentsRequestProto request) throws ServiceException {
+ try {
+ return real.getLiveComponents(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.ComponentInformationProto getLiveComponent(RpcController controller,
+ Messages.GetLiveComponentRequestProto request) throws ServiceException {
+ try {
+ return real.getLiveComponent(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveNodesResponseProto getLiveNodes(RpcController controller,
+ Messages.GetLiveNodesRequestProto request) throws ServiceException {
+ try {
+ return real.getLiveNodes(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.NodeInformationProto getLiveNode(RpcController controller,
+ Messages.GetLiveNodeRequestProto request) throws ServiceException {
+ try {
+ return real.getLiveNode(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesired(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getModelDesired(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesiredAppconf(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getModelDesiredAppconf(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ } }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesiredResources(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getModelDesiredResources(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolved(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getModelResolved(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolvedAppconf(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getModelResolvedAppconf(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolvedResources(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getModelResolvedResources(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getLiveResources(RpcController controller,
+ Messages.EmptyPayloadProto request) throws ServiceException {
+ try {
+ return real.getLiveResources(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+
+ @Override
+ public Messages.GetCertificateStoreResponseProto getClientCertificateStore(
+ RpcController controller,
+ Messages.GetCertificateStoreRequestProto request)
+ throws ServiceException {
+ try {
+ return real.getClientCertificateStore(request);
+ } catch (Exception e) {
+ throw wrap(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
new file mode 100644
index 0000000..b230816
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.rpc;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.Messages;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class SliderClusterProtocolProxy implements SliderClusterProtocol {
+
+ private static final RpcController NULL_CONTROLLER = null;
+ private final SliderClusterProtocolPB endpoint;
+ private final InetSocketAddress address;
+
+ public SliderClusterProtocolProxy(SliderClusterProtocolPB endpoint,
+ InetSocketAddress address) {
+ Preconditions.checkArgument(endpoint != null, "null endpoint");
+ Preconditions.checkNotNull(address != null, "null address");
+ this.endpoint = endpoint;
+ this.address = address;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("SliderClusterProtocolProxy{");
+ sb.append("address=").append(address);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion,
+ int clientMethodsHash)
+ throws IOException {
+ if (!protocol.equals(RPC.getProtocolName(SliderClusterProtocolPB.class))) {
+ throw new IOException("Serverside implements " +
+ RPC.getProtocolName(SliderClusterProtocolPB.class) +
+ ". The following requested protocol is unknown: " +
+ protocol);
+ }
+
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ RPC.getProtocolVersion(
+ SliderClusterProtocol.class),
+ SliderClusterProtocol.class);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return SliderClusterProtocol.versionID;
+ }
+
+ private IOException convert(ServiceException se) {
+ IOException ioe = ProtobufHelper.getRemoteException(se);
+ if (ioe instanceof RemoteException) {
+ RemoteException remoteException = (RemoteException) ioe;
+ return remoteException.unwrapRemoteException();
+ }
+ return ioe;
+ }
+
+ @Override
+ public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.stopCluster(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.UpgradeContainersResponseProto upgradeContainers(
+ Messages.UpgradeContainersRequestProto request) throws IOException,
+ YarnException {
+ try {
+ return endpoint.upgradeContainers(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request)
+ throws IOException {
+ try {
+ return endpoint.flexCluster(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus(
+ Messages.GetJSONClusterStatusRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.getJSONClusterStatus(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+
+ @Override
+ public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
+ Messages.GetInstanceDefinitionRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.getInstanceDefinition(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.listNodeUUIDsByRole(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.getNode(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.GetClusterNodesResponseProto getClusterNodes(Messages.GetClusterNodesRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.getClusterNodes(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+
+ @Override
+ public Messages.EchoResponseProto echo(Messages.EchoRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.echo(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+
+ @Override
+ public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request) throws
+ IOException,
+ YarnException {
+ try {
+ return endpoint.killContainer(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.AMSuicideResponseProto amSuicide(Messages.AMSuicideRequestProto request) throws
+ IOException {
+ try {
+ return endpoint.amSuicide(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.ApplicationLivenessInformationProto getLivenessInformation(
+ Messages.GetApplicationLivenessRequestProto request) throws IOException {
+ try {
+ return endpoint.getLivenessInformation(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveContainersResponseProto getLiveContainers(Messages.GetLiveContainersRequestProto request) throws
+ IOException {
+ try {
+ return endpoint.getLiveContainers(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.ContainerInformationProto getLiveContainer(Messages.GetLiveContainerRequestProto request) throws
+ IOException {
+ try {
+ return endpoint.getLiveContainer(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveComponentsResponseProto getLiveComponents(Messages.GetLiveComponentsRequestProto request) throws
+ IOException {
+ try {
+ return endpoint.getLiveComponents(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.ComponentInformationProto getLiveComponent(Messages.GetLiveComponentRequestProto request) throws
+ IOException {
+ try {
+ return endpoint.getLiveComponent(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveNodesResponseProto getLiveNodes(Messages.GetLiveNodesRequestProto request)
+ throws IOException {
+ try {
+ return endpoint.getLiveNodes(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.NodeInformationProto getLiveNode(Messages.GetLiveNodeRequestProto request)
+ throws IOException {
+ try {
+ return endpoint.getLiveNode(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getModelDesired(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesiredAppconf(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getModelDesiredAppconf(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesiredResources(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getModelDesiredResources(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolved(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getModelResolved(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolvedAppconf(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getModelResolvedAppconf(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolvedResources(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getModelResolvedResources(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getLiveResources(Messages.EmptyPayloadProto request) throws IOException {
+ try {
+ return endpoint.getLiveResources(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+
+ }
+
+ @Override
+ public Messages.GetCertificateStoreResponseProto getClientCertificateStore(Messages.GetCertificateStoreRequestProto request) throws
+ IOException {
+ try {
+ return endpoint.getClientCertificateStore(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw convert(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
new file mode 100644
index 0000000..fda23aa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.rpc;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.api.SliderClusterProtocol;
+import org.apache.slider.api.proto.Messages;
+import org.apache.slider.api.types.ApplicationLivenessInformation;
+import org.apache.slider.api.types.ComponentInformation;
+import org.apache.slider.api.types.ContainerInformation;
+import org.apache.slider.api.types.NodeInformation;
+import org.apache.slider.api.types.NodeInformationList;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTree;
+import org.apache.slider.core.exceptions.ServiceNotReadyException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.main.LauncherExitCodes;
+import org.apache.slider.core.persist.AggregateConfSerDeser;
+import org.apache.slider.core.persist.ConfTreeSerDeser;
+import org.apache.slider.server.appmaster.AppMasterActionOperations;
+import org.apache.slider.server.appmaster.actions.ActionFlexCluster;
+import org.apache.slider.server.appmaster.actions.ActionHalt;
+import org.apache.slider.server.appmaster.actions.ActionKillContainer;
+import org.apache.slider.server.appmaster.actions.ActionStopSlider;
+import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
+import org.apache.slider.server.appmaster.actions.AsyncAction;
+import org.apache.slider.server.appmaster.actions.QueueAccess;
+import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
+import org.apache.slider.server.appmaster.state.RoleInstance;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.apache.slider.server.appmaster.web.rest.application.resources.ContentCache;
+import org.apache.slider.server.services.security.CertificateManager;
+import org.apache.slider.server.services.security.SecurityStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.slider.api.proto.RestTypeMarshalling.marshall;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_COMPONENTS;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_CONTAINERS;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_NODES;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.LIVE_RESOURCES;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED_APPCONF;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_DESIRED_RESOURCES;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED_APPCONF;
+import static org.apache.slider.server.appmaster.web.rest.RestPaths.MODEL_RESOLVED_RESOURCES;
+
+/**
+ * Implement the {@link SliderClusterProtocol}.
+ */
+@SuppressWarnings("unchecked")
+
+public class SliderIPCService extends AbstractService
+ implements SliderClusterProtocol {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(SliderIPCService.class);
+
+ private final QueueAccess actionQueues;
+ private final StateAccessForProviders state;
+ private final MetricsAndMonitoring metricsAndMonitoring;
+ private final AppMasterActionOperations amOperations;
+ private final ContentCache cache;
+ private final CertificateManager certificateManager;
+
+ /**
+ * This is the prefix used for metrics
+ */
+ public static final String METRICS_PREFIX =
+ "org.apache.slider.api.SliderIPCService.";
+
+ /**
+ * Constructor
+ * @param amOperations access to any AM operations
+ * @param state state view
+ * @param actionQueues queues for actions
+ * @param metricsAndMonitoring metrics
+ * @param cache
+ */
+ public SliderIPCService(AppMasterActionOperations amOperations,
+ CertificateManager certificateManager,
+ StateAccessForProviders state,
+ QueueAccess actionQueues,
+ MetricsAndMonitoring metricsAndMonitoring,
+ ContentCache cache) {
+ super("SliderIPCService");
+ Preconditions.checkArgument(amOperations != null, "null amOperations");
+ Preconditions.checkArgument(state != null, "null appState");
+ Preconditions.checkArgument(actionQueues != null, "null actionQueues");
+ Preconditions.checkArgument(metricsAndMonitoring != null,
+ "null metricsAndMonitoring");
+ Preconditions.checkArgument(cache != null, "null cache");
+ this.state = state;
+ this.actionQueues = actionQueues;
+ this.metricsAndMonitoring = metricsAndMonitoring;
+ this.amOperations = amOperations;
+ this.cache = cache;
+ this.certificateManager = certificateManager;
+ }
+
+ @Override //SliderClusterProtocol
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(
+ this, protocol, clientVersion, clientMethodsHash);
+ }
+
+
+ @Override //SliderClusterProtocol
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return SliderClusterProtocol.versionID;
+ }
+
+ /**
+ * General actions to perform on a slider RPC call coming in
+ * @param operation operation to log
+ * @throws IOException problems
+ * @throws ServiceNotReadyException if the RPC service is constructed
+ * but not fully initialized
+ */
+ protected void onRpcCall(String operation) throws IOException {
+ log.debug("Received call to {}", operation);
+ metricsAndMonitoring.markMeterAndCounter(METRICS_PREFIX + operation);
+ }
+
+ /**
+ * Schedule an action
+ * @param action for delayed execution
+ */
+ public void schedule(AsyncAction action) {
+ actionQueues.schedule(action);
+ }
+
+ /**
+ * Queue an action for immediate execution in the executor thread
+ * @param action action to execute
+ */
+ public void queue(AsyncAction action) {
+ actionQueues.put(action);
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("stop");
+ String message = request.getMessage();
+ if (message == null) {
+ message = "application stopped by client";
+ }
+ ActionStopSlider stopSlider =
+ new ActionStopSlider(message,
+ 1000, TimeUnit.MILLISECONDS,
+ LauncherExitCodes.EXIT_SUCCESS,
+ FinalApplicationStatus.SUCCEEDED,
+ message);
+ log.info("SliderAppMasterApi.stopCluster: {}", stopSlider);
+ schedule(stopSlider);
+ return Messages.StopClusterResponseProto.getDefaultInstance();
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.UpgradeContainersResponseProto upgradeContainers(
+ Messages.UpgradeContainersRequestProto request) throws IOException,
+ YarnException {
+ onRpcCall("upgrade");
+ String message = request.getMessage();
+ if (message == null) {
+ message = "application containers upgraded by client";
+ }
+ ActionUpgradeContainers upgradeContainers =
+ new ActionUpgradeContainers(
+ "Upgrade containers",
+ 1000, TimeUnit.MILLISECONDS,
+ LauncherExitCodes.EXIT_SUCCESS,
+ FinalApplicationStatus.SUCCEEDED,
+ request.getContainerList(),
+ request.getComponentList(),
+ message);
+ log.info("SliderAppMasterApi.upgradeContainers: {}", upgradeContainers);
+ schedule(upgradeContainers);
+ return Messages.UpgradeContainersResponseProto.getDefaultInstance();
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request)
+ throws IOException {
+ onRpcCall("flex");
+ String payload = request.getClusterSpec();
+ ConfTreeSerDeser confTreeSerDeser = new ConfTreeSerDeser();
+ ConfTree updatedResources = confTreeSerDeser.fromJson(payload);
+ schedule(new ActionFlexCluster("flex", 1, TimeUnit.MILLISECONDS,
+ updatedResources));
+ return Messages.FlexClusterResponseProto.newBuilder().setResponse(
+ true).build();
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.GetJSONClusterStatusResponseProto getJSONClusterStatus(
+ Messages.GetJSONClusterStatusRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("getstatus");
+ String result;
+ //quick update
+ //query and json-ify
+ ClusterDescription cd = state.refreshClusterStatus();
+ result = cd.toJsonString();
+ String stat = result;
+ return Messages.GetJSONClusterStatusResponseProto.newBuilder()
+ .setClusterSpec(stat)
+ .build();
+ }
+
+ @Override
+ public Messages.GetInstanceDefinitionResponseProto getInstanceDefinition(
+ Messages.GetInstanceDefinitionRequestProto request)
+ throws IOException, YarnException {
+
+ onRpcCall("getinstancedefinition");
+ String internal;
+ String resources;
+ String app;
+ AggregateConf instanceDefinition =
+ state.getInstanceDefinitionSnapshot();
+ internal = instanceDefinition.getInternal().toJson();
+ resources = instanceDefinition.getResources().toJson();
+ app = instanceDefinition.getAppConf().toJson();
+ assert internal != null;
+ assert resources != null;
+ assert app != null;
+ log.debug("Generating getInstanceDefinition Response");
+ Messages.GetInstanceDefinitionResponseProto.Builder builder =
+ Messages.GetInstanceDefinitionResponseProto.newBuilder();
+ builder.setInternal(internal);
+ builder.setResources(resources);
+ builder.setApplication(app);
+ return builder.build();
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.ListNodeUUIDsByRoleResponseProto listNodeUUIDsByRole(Messages.ListNodeUUIDsByRoleRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("listnodes)");
+ String role = request.getRole();
+ Messages.ListNodeUUIDsByRoleResponseProto.Builder builder =
+ Messages.ListNodeUUIDsByRoleResponseProto.newBuilder();
+ List<RoleInstance> nodes = state.enumLiveInstancesInRole(role);
+ for (RoleInstance node : nodes) {
+ builder.addUuid(node.id);
+ }
+ return builder.build();
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.GetNodeResponseProto getNode(Messages.GetNodeRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("getnode");
+ RoleInstance instance = state.getLiveInstanceByContainerID(
+ request.getUuid());
+ return Messages.GetNodeResponseProto.newBuilder()
+ .setClusterNode(instance.toProtobuf())
+ .build();
+ }
+
+ @Override //SliderClusterProtocol
+ public Messages.GetClusterNodesResponseProto getClusterNodes(
+ Messages.GetClusterNodesRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("getclusternodes");
+ List<RoleInstance>
+ clusterNodes = state.getLiveInstancesByContainerIDs(
+ request.getUuidList());
+
+ Messages.GetClusterNodesResponseProto.Builder builder =
+ Messages.GetClusterNodesResponseProto.newBuilder();
+ for (RoleInstance node : clusterNodes) {
+ builder.addClusterNode(node.toProtobuf());
+ }
+ //at this point: a possibly empty list of nodes
+ return builder.build();
+ }
+
+ @Override
+ public Messages.EchoResponseProto echo(Messages.EchoRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("echo");
+ Messages.EchoResponseProto.Builder builder =
+ Messages.EchoResponseProto.newBuilder();
+ String text = request.getText();
+ log.info("Echo request size ={}", text.length());
+ log.info(text);
+ //now return it
+ builder.setText(text);
+ return builder.build();
+ }
+
+ @Override
+ public Messages.KillContainerResponseProto killContainer(Messages.KillContainerRequestProto request)
+ throws IOException, YarnException {
+ onRpcCall("killcontainer");
+ String containerID = request.getId();
+ log.info("Kill Container {}", containerID);
+ //throws NoSuchNodeException if it is missing
+ RoleInstance instance =
+ state.getLiveInstanceByContainerID(containerID);
+ queue(new ActionKillContainer(instance.getId(), 0, TimeUnit.MILLISECONDS,
+ amOperations));
+ Messages.KillContainerResponseProto.Builder builder =
+ Messages.KillContainerResponseProto.newBuilder();
+ builder.setSuccess(true);
+ return builder.build();
+ }
+
+
+ @Override
+ public Messages.AMSuicideResponseProto amSuicide(
+ Messages.AMSuicideRequestProto request)
+ throws IOException {
+ onRpcCall("amsuicide");
+ int signal = request.getSignal();
+ String text = request.getText();
+ if (text == null) {
+ text = "";
+ }
+ int delay = request.getDelay();
+ log.info("AM Suicide with signal {}, message {} delay = {}", signal, text,
+ delay);
+ ActionHalt action = new ActionHalt(signal, text, delay,
+ TimeUnit.MILLISECONDS);
+ schedule(action);
+ return Messages.AMSuicideResponseProto.getDefaultInstance();
+ }
+
+ @Override
+ public Messages.ApplicationLivenessInformationProto getLivenessInformation(
+ Messages.GetApplicationLivenessRequestProto request) throws IOException {
+ ApplicationLivenessInformation info =
+ state.getApplicationLivenessInformation();
+ return marshall(info);
+ }
+
+ @Override
+ public Messages.GetLiveContainersResponseProto getLiveContainers(
+ Messages.GetLiveContainersRequestProto request)
+ throws IOException {
+ Map<String, ContainerInformation> infoMap =
+ (Map<String, ContainerInformation>) cache.lookupWithIOE(LIVE_CONTAINERS);
+ Messages.GetLiveContainersResponseProto.Builder builder =
+ Messages.GetLiveContainersResponseProto.newBuilder();
+
+ for (Map.Entry<String, ContainerInformation> entry : infoMap.entrySet()) {
+ builder.addNames(entry.getKey());
+ builder.addContainers(marshall(entry.getValue()));
+ }
+ return builder.build();
+ }
+
+ @Override
+ public Messages.ContainerInformationProto getLiveContainer(Messages.GetLiveContainerRequestProto request)
+ throws IOException {
+ String containerId = request.getContainerId();
+ RoleInstance id = state.getLiveInstanceByContainerID(containerId);
+ ContainerInformation containerInformation = id.serialize();
+ return marshall(containerInformation);
+ }
+
+ @Override
+ public Messages.GetLiveComponentsResponseProto getLiveComponents(Messages.GetLiveComponentsRequestProto request)
+ throws IOException {
+ Map<String, ComponentInformation> infoMap =
+ (Map<String, ComponentInformation>) cache.lookupWithIOE(LIVE_COMPONENTS);
+ Messages.GetLiveComponentsResponseProto.Builder builder =
+ Messages.GetLiveComponentsResponseProto.newBuilder();
+
+ for (Map.Entry<String, ComponentInformation> entry : infoMap.entrySet()) {
+ builder.addNames(entry.getKey());
+ builder.addComponents(marshall(entry.getValue()));
+ }
+ return builder.build();
+ }
+
+
+ @Override
+ public Messages.ComponentInformationProto getLiveComponent(Messages.GetLiveComponentRequestProto request)
+ throws IOException {
+ String name = request.getName();
+ try {
+ return marshall(state.getComponentInformation(name));
+ } catch (YarnRuntimeException e) {
+ throw new FileNotFoundException("Unknown component: " + name);
+ }
+ }
+
+ @Override
+ public Messages.GetLiveNodesResponseProto getLiveNodes(Messages.GetLiveNodesRequestProto request)
+ throws IOException {
+ NodeInformationList info = (NodeInformationList) cache.lookupWithIOE(LIVE_NODES);
+ Messages.GetLiveNodesResponseProto.Builder builder =
+ Messages.GetLiveNodesResponseProto.newBuilder();
+
+ for (NodeInformation nodeInformation : info) {
+ builder.addNodes(marshall(nodeInformation));
+ }
+ return builder.build();
+ }
+
+
+ @Override
+ public Messages.NodeInformationProto getLiveNode(Messages.GetLiveNodeRequestProto request)
+ throws IOException {
+ String name = request.getName();
+ NodeInformation nodeInformation = state.getNodeInformation(name);
+ if (nodeInformation != null) {
+ return marshall(nodeInformation);
+ } else {
+ throw new FileNotFoundException("Unknown host: " + name);
+ }
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupAggregateConf(MODEL_DESIRED);
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesiredAppconf(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupConfTree(MODEL_DESIRED_APPCONF);
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelDesiredResources(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupConfTree(MODEL_DESIRED_RESOURCES);
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolved(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupAggregateConf(MODEL_RESOLVED);
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolvedAppconf(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupConfTree(MODEL_RESOLVED_APPCONF);
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getModelResolvedResources(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupConfTree(MODEL_RESOLVED_RESOURCES);
+ }
+
+ @Override
+ public Messages.WrappedJsonProto getLiveResources(Messages.EmptyPayloadProto request) throws IOException {
+ return lookupConfTree(LIVE_RESOURCES);
+ }
+
+ /**
+ * Helper method; look up an aggregate configuration in the cache from
+ * a key, or raise an exception
+ * @param key key to resolve
+ * @return the configuration
+ * @throws IOException on a failure
+ */
+
+ protected Messages.WrappedJsonProto lookupAggregateConf(String key) throws
+ IOException {
+ AggregateConf aggregateConf = (AggregateConf) cache.lookupWithIOE(key);
+ String json = AggregateConfSerDeser.toString(aggregateConf);
+ return wrap(json);
+ }
+
+ /**
+ * Helper method; look up an conf tree in the cache from
+ * a key, or raise an exception
+ * @param key key to resolve
+ * @return the configuration
+ * @throws IOException on a failure
+ */
+ protected Messages.WrappedJsonProto lookupConfTree(String key) throws
+ IOException {
+ ConfTree conf = (ConfTree) cache.lookupWithIOE(key);
+ String json = ConfTreeSerDeser.toString(conf);
+ return wrap(json);
+ }
+
+ private Messages.WrappedJsonProto wrap(String json) {
+ Messages.WrappedJsonProto.Builder builder =
+ Messages.WrappedJsonProto.newBuilder();
+ builder.setJson(json);
+ return builder.build();
+ }
+
+ @Override
+ public Messages.GetCertificateStoreResponseProto getClientCertificateStore(Messages.GetCertificateStoreRequestProto request) throws
+ IOException {
+ String hostname = request.getHostname();
+ String clientId = request.getRequesterId();
+ String password = request.getPassword();
+ String type = request.getType();
+
+ SecurityStore store = null;
+ try {
+ if ( SecurityStore.StoreType.keystore.equals(
+ SecurityStore.StoreType.valueOf(type))) {
+ store = certificateManager.generateContainerKeystore(hostname,
+ clientId,
+ null,
+ password);
+ } else if (SecurityStore.StoreType.truststore.equals(
+ SecurityStore.StoreType.valueOf(type))) {
+ store = certificateManager.generateContainerTruststore(clientId,
+ null,
+ password);
+
+ } else {
+ throw new IOException("Illegal store type");
+ }
+ } catch (SliderException e) {
+ throw new IOException(e);
+ }
+ return marshall(store);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java
new file mode 100644
index 0000000..4fd4910
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderRPCSecurityInfo.java
@@ -0,0 +1,87 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.slider.server.appmaster.rpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenSelector;
+import org.apache.slider.common.SliderXmlConfKeys;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * This is where security information goes.
+ * It is referred to in the <code>META-INF/services/org.apache.hadoop.security.SecurityInfo</code>
+ * resource of this JAR, which is used to find the binding info
+ */
+public class SliderRPCSecurityInfo extends SecurityInfo {
+
+ @Override
+ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+ if (!protocol.equals(SliderClusterProtocolPB.class)) {
+ return null;
+ }
+ return new KerberosInfo() {
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public String serverPrincipal() {
+ return SliderXmlConfKeys.KEY_KERBEROS_PRINCIPAL;
+ }
+
+ @Override
+ public String clientPrincipal() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+ if (!protocol.equals(SliderClusterProtocolPB.class)) {
+ return null;
+ }
+ return new TokenInfo() {
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends TokenSelector<? extends TokenIdentifier>>
+ value() {
+ return ClientToAMTokenSelector.class;
+ }
+
+ @Override
+ public String toString() {
+ return "SliderClusterProtocolPB token info";
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
new file mode 100644
index 0000000..9a89c39
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/security/SecurityConfiguration.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.server.appmaster.security;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import static org.apache.slider.core.main.LauncherExitCodes.EXIT_UNAUTHORIZED;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.SliderXmlConfKeys;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.exceptions.SliderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Class keeping code security information
+ */
+public class SecurityConfiguration {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(SecurityConfiguration.class);
+ private final Configuration configuration;
+ private final AggregateConf instanceDefinition;
+ private String clusterName;
+
+ public SecurityConfiguration(Configuration configuration,
+ AggregateConf instanceDefinition,
+ String clusterName) throws SliderException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(instanceDefinition);
+ Preconditions.checkNotNull(clusterName);
+ this.configuration = configuration;
+ this.instanceDefinition = instanceDefinition;
+ this.clusterName = clusterName;
+ validate();
+ }
+
+ private void validate() throws SliderException {
+ if (isSecurityEnabled()) {
+ String principal = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM).get(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL);
+ if(SliderUtils.isUnset(principal)) {
+ // if no login identity is available, fail
+ UserGroupInformation loginUser = null;
+ try {
+ loginUser = getLoginUser();
+ } catch (IOException e) {
+ throw new SliderException(EXIT_UNAUTHORIZED, e,
+ "No principal configured for the application and "
+ + "exception raised during retrieval of login user. "
+ + "Unable to proceed with application "
+ + "initialization. Please ensure a value "
+ + "for %s exists in the application "
+ + "configuration or the login issue is addressed",
+ SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL);
+ }
+ if (loginUser == null) {
+ throw new SliderException(EXIT_UNAUTHORIZED,
+ "No principal configured for the application "
+ + "and no login user found. "
+ + "Unable to proceed with application "
+ + "initialization. Please ensure a value "
+ + "for %s exists in the application "
+ + "configuration or the login issue is addressed",
+ SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL);
+ }
+ }
+ // ensure that either local or distributed keytab mechanism is enabled,
+ // but not both
+ String keytabFullPath = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM)
+ .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+ String keytabName = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM)
+ .get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+ if (SliderUtils.isSet(keytabFullPath) && SliderUtils.isSet(keytabName)) {
+ throw new SliderException(EXIT_UNAUTHORIZED,
+ "Both a keytab on the cluster host (%s) and a"
+ + " keytab to be retrieved from HDFS (%s) are"
+ + " specified. Please configure only one keytab"
+ + " retrieval mechanism.",
+ SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH,
+ SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+
+ }
+ }
+ }
+
+ protected UserGroupInformation getLoginUser() throws IOException {
+ return UserGroupInformation.getLoginUser();
+ }
+
+ public boolean isSecurityEnabled () {
+ return SliderUtils.isHadoopClusterSecure(configuration);
+ }
+
+ public String getPrincipal () throws IOException {
+ String principal = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM).get(SliderXmlConfKeys.KEY_KEYTAB_PRINCIPAL);
+ if (SliderUtils.isUnset(principal)) {
+ principal = UserGroupInformation.getLoginUser().getShortUserName();
+ log.info("No principal set in the slider configuration. Will use AM login"
+ + " identity {} to attempt keytab-based login", principal);
+ }
+
+ return principal;
+ }
+
+ public boolean isKeytabProvided() {
+ boolean keytabProvided = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM)
+ .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH) != null ||
+ instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM).
+ get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME) != null;
+ return keytabProvided;
+
+ }
+
+ public File getKeytabFile(AggregateConf instanceDefinition)
+ throws SliderException, IOException {
+ String keytabFullPath = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM)
+ .get(SliderXmlConfKeys.KEY_AM_KEYTAB_LOCAL_PATH);
+ File localKeytabFile;
+ if (SliderUtils.isUnset(keytabFullPath)) {
+ // get the keytab
+ String keytabName = instanceDefinition.getAppConfOperations()
+ .getComponent(SliderKeys.COMPONENT_AM).
+ get(SliderXmlConfKeys.KEY_AM_LOGIN_KEYTAB_NAME);
+ log.info("No host keytab file path specified. Will attempt to retrieve"
+ + " keytab file {} as a local resource for the container",
+ keytabName);
+ // download keytab to local, protected directory
+ localKeytabFile = new File(SliderKeys.KEYTAB_DIR, keytabName);
+ } else {
+ log.info("Using host keytab file {} for login", keytabFullPath);
+ localKeytabFile = new File(keytabFullPath);
+ }
+ return localKeytabFile;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/82fdd408/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java
new file mode 100644
index 0000000..54f384b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/appmaster/state/AbstractClusterServices.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.state;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+
+/**
+ * Cluster services offered by the YARN infrastructure.
+ */
+public abstract class AbstractClusterServices {
+
+ private final DefaultResourceCalculator
+ defaultResourceCalculator = new DefaultResourceCalculator();
+
+ /**
+ * Create a resource for requests
+ * @return a resource which can be built up.
+ */
+ public abstract Resource newResource();
+
+ public abstract Resource newResource(int memory, int cores);
+
+ /**
+ * Normalise memory, CPU and other resources according to the YARN AM-supplied
+ * values and the resource calculator in use (currently hard-coded to the
+ * {@link DefaultResourceCalculator}.
+ * Those resources which aren't normalized (currently: CPU) are left
+ * as is.
+ * @param resource resource requirements of a role
+ * @param minR minimum values of this queue
+ * @param maxR max values of this queue
+ * @return a normalized value.
+ */
+ public Resource normalize(Resource resource, Resource minR, Resource maxR) {
+ Preconditions.checkArgument(resource != null, "null resource");
+ Preconditions.checkArgument(minR != null, "null minR");
+ Preconditions.checkArgument(maxR != null, "null maxR");
+
+ Resource normalize = defaultResourceCalculator.normalize(resource, minR,
+ maxR, minR);
+ return newResource(normalize.getMemory(), resource.getVirtualCores());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org