You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/09/12 00:36:10 UTC
[68/84] [abbrv] hadoop git commit: YARN-7091. Rename application to
service in yarn-native-services. Contributed by Jian He
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
new file mode 100644
index 0000000..cb7131e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
+import org.apache.hadoop.yarn.service.ContainerFailureTracker;
+import org.apache.hadoop.yarn.service.ServiceContext;
+import org.apache.hadoop.yarn.service.ServiceScheduler;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
+import org.apache.hadoop.yarn.service.ServiceMetrics;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
+import org.apache.hadoop.yarn.service.monitor.probe.Probe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*;
+import static org.apache.hadoop.yarn.service.component.ComponentEventType.*;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
+import static org.apache.hadoop.yarn.service.component.ComponentState.*;
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.CONTAINER_FAILURE_THRESHOLD;
+
+public class Component implements EventHandler<ComponentEvent> {
+ private static final Logger LOG = LoggerFactory.getLogger(Component.class);
+
+ private org.apache.hadoop.yarn.service.api.records.Component componentSpec;
+ private long allocateId;
+ private Priority priority;
+ private ServiceMetrics componentMetrics;
+ private ServiceScheduler scheduler;
+ private ServiceContext context;
+ private AMRMClientAsync<ContainerRequest> amrmClient;
+ private AtomicLong instanceIdCounter = new AtomicLong();
+ private Map<ComponentInstanceId, ComponentInstance> compInstances =
+ new ConcurrentHashMap<>();
+ // component instances to be assigned with a container
+ private List<ComponentInstance> pendingInstances = new LinkedList<>();
+ private ContainerFailureTracker failureTracker;
+ private Probe probe;
+ private final ReentrantReadWriteLock.ReadLock readLock;
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+ public int maxContainerFailurePerComp;
+ // The number of containers failed since last reset. This excludes preempted,
+ // disk_failed containers etc. This will be reset to 0 periodically.
+ public AtomicInteger currentContainerFailure = new AtomicInteger(0);
+
+ private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
+ stateMachine;
+ private AsyncDispatcher compInstanceDispatcher;
+ private static final StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>
+ stateMachineFactory =
+ new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
+ INIT)
+ // INIT will only got to FLEXING
+ .addTransition(INIT, EnumSet.of(STABLE, FLEXING),
+ FLEX, new FlexComponentTransition())
+
+ // container allocated by RM
+ .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
+ new ContainerAllocatedTransition())
+ // container launched on NM
+ .addTransition(FLEXING, EnumSet.of(STABLE, FLEXING),
+ CONTAINER_STARTED, new ContainerStartedTransition())
+ // container failed while flexing
+ .addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED,
+ new ContainerCompletedTransition())
+ // Flex while previous flex is still in progress
+ .addTransition(FLEXING, EnumSet.of(FLEXING), FLEX,
+ new FlexComponentTransition())
+
+ // container failed while stable
+ .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED,
+ new ContainerCompletedTransition())
+ // Ignore surplus container
+ .addTransition(STABLE, STABLE, CONTAINER_ALLOCATED,
+ new ContainerAllocatedTransition())
+ // Flex by user
+ // For flex up, go to FLEXING state
+ // For flex down, go to STABLE state
+ .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
+ FLEX, new FlexComponentTransition())
+ .installTopology();
+
+ public Component(
+ org.apache.hadoop.yarn.service.api.records.Component component,
+ long allocateId, ServiceContext context) {
+ this.allocateId = allocateId;
+ this.priority = Priority.newInstance((int) allocateId);
+ this.componentSpec = component;
+ componentMetrics = ServiceMetrics.register(component.getName(),
+ "Metrics for component " + component.getName());
+ componentMetrics
+ .tag("type", "Metrics type [component or service]", "component");
+ this.scheduler = context.scheduler;
+ this.context = context;
+ amrmClient = scheduler.getAmRMClient();
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+ this.stateMachine = stateMachineFactory.make(this);
+ compInstanceDispatcher = scheduler.getCompInstanceDispatcher();
+ failureTracker =
+ new ContainerFailureTracker(context, this);
+ probe = MonitorUtils.getProbe(componentSpec.getReadinessCheck());
+ maxContainerFailurePerComp = componentSpec.getConfiguration()
+ .getPropertyInt(CONTAINER_FAILURE_THRESHOLD, 10);
+ createNumCompInstances(component.getNumberOfContainers());
+ }
+
+ private void createNumCompInstances(long count) {
+ for (int i = 0; i < count; i++) {
+ createOneCompInstance();
+ }
+ }
+
+ private void createOneCompInstance() {
+ ComponentInstanceId id =
+ new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
+ componentSpec.getName());
+ ComponentInstance instance = new ComponentInstance(this, id);
+ compInstances.put(id, instance);
+ pendingInstances.add(instance);
+ }
+
+ private static class FlexComponentTransition implements
+ MultipleArcTransition<Component, ComponentEvent, ComponentState> {
+ // For flex up, go to FLEXING state
+ // For flex down, go to STABLE state
+ @Override
+ public ComponentState transition(Component component,
+ ComponentEvent event) {
+ component.setDesiredContainers((int)event.getDesired());
+ if (!component.areDependenciesReady()) {
+ LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not"
+ + " satisfied.", component.getName());
+ return component.getState();
+ }
+ if (component.getState() == INIT) {
+ // This happens on init
+ LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event
+ .getDesired() + " instances.");
+ component.requestContainers(event.getDesired());
+ return FLEXING;
+ }
+ long before = component.getComponentSpec().getNumberOfContainers();
+ long delta = event.getDesired() - before;
+ component.getComponentSpec().setNumberOfContainers(event.getDesired());
+ if (delta > 0) {
+ // Scale up
+ LOG.info("[FLEX UP COMPONENT " + component.getName() + "]: scaling up from "
+ + before + " to " + event.getDesired());
+ component.requestContainers(delta);
+ component.createNumCompInstances(delta);
+ return FLEXING;
+ } else if (delta < 0){
+ delta = 0 - delta;
+ // scale down
+ LOG.info("[FLEX DOWN COMPONENT " + component.getName()
+ + "]: scaling down from " + before + " to " + event.getDesired());
+ List<ComponentInstance> list =
+ new ArrayList<>(component.compInstances.values());
+
+ // sort in Most recent -> oldest order, destroy most recent ones.
+ Collections.sort(list, Collections.reverseOrder());
+ for (int i = 0; i < delta; i++) {
+ ComponentInstance instance = list.get(i);
+ // remove the instance
+ component.compInstances.remove(instance.getCompInstanceId());
+ component.pendingInstances.remove(instance);
+ component.componentMetrics.containersFailed.incr();
+ component.componentMetrics.containersRunning.decr();
+ // decrement id counter
+ component.instanceIdCounter.decrementAndGet();
+ instance.destroy();
+ }
+ return STABLE;
+ } else {
+ LOG.info("[FLEX COMPONENT " + component.getName() + "]: already has " +
+ event.getDesired() + " instances, ignoring");
+ return STABLE;
+ }
+ }
+ }
+
+ private static class ContainerAllocatedTransition extends BaseTransition {
+ @Override
+ public void transition(Component component, ComponentEvent event) {
+ component.assignContainerToCompInstance(event.getContainer());
+ }
+ }
+
+ private static class ContainerStartedTransition implements
+ MultipleArcTransition<Component,ComponentEvent,ComponentState> {
+
+ @Override public ComponentState transition(Component component,
+ ComponentEvent event) {
+ component.compInstanceDispatcher.getEventHandler().handle(
+ new ComponentInstanceEvent(event.getInstance().getContainerId(),
+ START));
+ component.incRunningContainers();
+ return checkIfStable(component);
+ }
+ }
+
+ private static ComponentState checkIfStable(Component component) {
+ // if desired == running
+ if (component.componentMetrics.containersRunning.value() == component
+ .getComponentSpec().getNumberOfContainers()) {
+ return STABLE;
+ } else {
+ return FLEXING;
+ }
+ }
+
+ private static class ContainerCompletedTransition extends BaseTransition {
+ @Override
+ public void transition(Component component, ComponentEvent event) {
+ component.updateMetrics(event.getStatus());
+
+ // add back to pending list
+ component.pendingInstances.add(event.getInstance());
+ LOG.info(
+ "[COMPONENT {}]: {} completed, num pending comp instances increased to {}.",
+ component.getName(), event.getStatus().getContainerId(),
+ component.pendingInstances.size());
+ component.compInstanceDispatcher.getEventHandler().handle(
+ new ComponentInstanceEvent(event.getStatus().getContainerId(),
+ STOP).setStatus(event.getStatus()));
+ }
+ }
+
+ public ServiceMetrics getCompMetrics () {
+ return componentMetrics;
+ }
+
+ private void assignContainerToCompInstance(Container container) {
+ if (pendingInstances.size() == 0) {
+ LOG.info(
+ "[COMPONENT {}]: No pending component instance left, release surplus container {}",
+ getName(), container.getId());
+ scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
+ componentMetrics.surplusContainers.incr();
+ scheduler.getServiceMetrics().surplusContainers.incr();
+ return;
+ }
+ ComponentInstance instance = pendingInstances.remove(0);
+ LOG.info(
+ "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}",
+ getName(), container.getId(), pendingInstances.size());
+ instance.setContainer(container);
+ scheduler.addLiveCompInstance(container.getId(), instance);
+ LOG.info(
+ "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
+ getName(), container.getId(), instance.getCompInstanceName(),
+ container.getNodeId());
+ scheduler.getContainerLaunchService()
+ .launchCompInstance(scheduler.getApp(), instance, container);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public void requestContainers(long count) {
+ Resource resource = Resource
+ .newInstance(componentSpec.getResource().getMemoryMB(),
+ componentSpec.getResource().getCpus());
+
+ for (int i = 0; i < count; i++) {
+ //TODO Once YARN-5468 is done, use that for anti-affinity
+ ContainerRequest request =
+ ContainerRequest.newBuilder().capability(resource).priority(priority)
+ .allocationRequestId(allocateId).relaxLocality(true).build();
+ amrmClient.addContainerRequest(request);
+ }
+ }
+
+ private void setDesiredContainers(int n) {
+ int delta = n - scheduler.getServiceMetrics().containersDesired.value();
+ if (delta > 0) {
+ scheduler.getServiceMetrics().containersDesired.incr(delta);
+ } else {
+ scheduler.getServiceMetrics().containersDesired.decr(delta);
+ }
+ componentMetrics.containersDesired.set(n);
+ }
+
+
+
+ private void updateMetrics(ContainerStatus status) {
+ switch (status.getExitStatus()) {
+ case SUCCESS:
+ componentMetrics.containersSucceeded.incr();
+ scheduler.getServiceMetrics().containersSucceeded.incr();
+ return;
+ case PREEMPTED:
+ componentMetrics.containersPreempted.incr();
+ scheduler.getServiceMetrics().containersPreempted.incr();
+ break;
+ case DISKS_FAILED:
+ componentMetrics.containersDiskFailure.incr();
+ scheduler.getServiceMetrics().containersDiskFailure.incr();
+ break;
+ default:
+ break;
+ }
+
+ // containersFailed include preempted, disks_failed etc.
+ componentMetrics.containersFailed.incr();
+ scheduler.getServiceMetrics().containersFailed.incr();
+
+ // dec running container
+ decRunningContainers();
+
+ if (Apps.shouldCountTowardsNodeBlacklisting(status.getExitStatus())) {
+ String host = scheduler.getLiveInstances().get(status.getContainerId())
+ .getNodeId().getHost();
+ failureTracker.incNodeFailure(host);
+ currentContainerFailure.getAndIncrement() ;
+ }
+ }
+
+ public boolean areDependenciesReady() {
+ List<String> dependencies = componentSpec.getDependencies();
+ if (SliderUtils.isEmpty(dependencies)) {
+ return true;
+ }
+ for (String dependency : dependencies) {
+ Component dependentComponent =
+ scheduler.getAllComponents().get(dependency);
+ if (dependentComponent == null) {
+ LOG.error("Couldn't find dependency {} for {} (should never happen)",
+ dependency, getName());
+ continue;
+ }
+ if (dependentComponent.getNumReadyInstances() < dependentComponent
+ .getNumDesiredInstances()) {
+ LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}"
+ + " instances are ready.", getName(), dependency,
+ dependentComponent.getNumReadyInstances(),
+ dependentComponent.getNumDesiredInstances());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void incRunningContainers() {
+ componentMetrics.containersRunning.incr();
+ scheduler.getServiceMetrics().containersRunning.incr();
+ }
+
+ public void incContainersReady() {
+ componentMetrics.containersReady.incr();
+ }
+
+ public void decContainersReady() {
+ componentMetrics.containersReady.decr();
+ }
+
+ private void decRunningContainers() {
+ componentMetrics.containersRunning.decr();
+ scheduler.getServiceMetrics().containersRunning.decr();
+ }
+
+ public int getNumReadyInstances() {
+ return componentMetrics.containersReady.value();
+ }
+
+ public int getNumRunningInstances() {
+ return componentMetrics.containersRunning.value();
+ }
+
+ public int getNumDesiredInstances() {
+ return componentMetrics.containersDesired.value();
+ }
+
+ public Map<ComponentInstanceId, ComponentInstance> getAllComponentInstances() {
+ return compInstances;
+ }
+
+ public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() {
+ return this.componentSpec;
+ }
+
+ public void resetCompFailureCount() {
+ LOG.info("[COMPONENT {}]: Reset container failure count from {} to 0.",
+ getName(), currentContainerFailure.get());
+ currentContainerFailure.set(0);
+ failureTracker.resetContainerFailures();
+ }
+
+ public Probe getProbe() {
+ return probe;
+ }
+
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public long getAllocateId() {
+ return allocateId;
+ }
+
+ public String getName () {
+ return componentSpec.getName();
+ }
+
+ public ComponentState getState() {
+ this.readLock.lock();
+
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+ public ServiceScheduler getScheduler() {
+ return scheduler;
+ }
+
+ @Override
+ public void handle(ComponentEvent event) {
+ try {
+ writeLock.lock();
+ ComponentState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitionException e) {
+ LOG.error(MessageFormat.format("[COMPONENT {0}]: Invalid event {1} at {2}",
+ componentSpec.getName(), event.getType(), oldState), e);
+ }
+ if (oldState != getState()) {
+ LOG.info("[COMPONENT {}] Transitioned from {} to {} on {} event.",
+ componentSpec.getName(), oldState, getState(), event.getType());
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private static class BaseTransition implements
+ SingleArcTransition<Component, ComponentEvent> {
+
+ @Override public void transition(Component component,
+ ComponentEvent event) {
+ }
+ }
+
+ public ServiceContext getContext() {
+ return context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
new file mode 100644
index 0000000..d93dcf1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+
+public class ComponentEvent extends AbstractEvent<ComponentEventType> {
+ private long desired;
+ private final String name;
+ private final ComponentEventType type;
+ private Container container;
+ private ComponentInstance instance;
+ private ContainerStatus status;
+
+ public ComponentEvent(String name, ComponentEventType type) {
+ super(type);
+ this.name = name;
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public ComponentEventType getType() {
+ return type;
+ }
+
+ public long getDesired() {
+ return desired;
+ }
+
+ public ComponentEvent setDesired(long desired) {
+ this.desired = desired;
+ return this;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+ public ComponentEvent setContainer(Container container) {
+ this.container = container;
+ return this;
+ }
+
+ public ComponentInstance getInstance() {
+ return instance;
+ }
+
+ public ComponentEvent setInstance(ComponentInstance instance) {
+ this.instance = instance;
+ return this;
+ }
+
+ public ContainerStatus getStatus() {
+ return status;
+ }
+
+ public ComponentEvent setStatus(ContainerStatus status) {
+ this.status = status;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
new file mode 100644
index 0000000..6729699
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+public enum ComponentEventType {
+ FLEX,
+ CONTAINER_ALLOCATED,
+ CONTAINER_STARTED,
+ CONTAINER_COMPLETED
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
new file mode 100644
index 0000000..a5f9ff4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component;
+
+public enum ComponentState {
+ INIT,
+ FLEXING,
+ STABLE
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
new file mode 100644
index 0000000..7d6525b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.service.ServiceScheduler;
+import org.apache.hadoop.yarn.service.api.records.ContainerState;
+import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BoundedAppender;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
+import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
+import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
+import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
+import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
+
+public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
+ Comparable<ComponentInstance> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ComponentInstance.class);
+
+ private StateMachine<ComponentInstanceState, ComponentInstanceEventType,
+ ComponentInstanceEvent> stateMachine;
+ private Component component;
+ private final ReadLock readLock;
+ private final WriteLock writeLock;
+
+ private ComponentInstanceId compInstanceId = null;
+ private Path compInstanceDir;
+ private Container container;
+ private YarnRegistryViewForProviders yarnRegistryOperations;
+ private FileSystem fs;
+ private boolean timelineServiceEnabled = false;
+ private ServiceTimelinePublisher serviceTimelinePublisher;
+ private ServiceScheduler scheduler;
+ private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
+ private volatile ScheduledFuture containerStatusFuture;
+ private volatile ContainerStatus status;
+ private long containerStartedTime = 0;
+ // This container object is used for rest API query
+ private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
+
+ private static final StateMachineFactory<ComponentInstance,
+ ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>
+ stateMachineFactory =
+ new StateMachineFactory<ComponentInstance, ComponentInstanceState,
+ ComponentInstanceEventType, ComponentInstanceEvent>(INIT)
+ .addTransition(INIT, STARTED, START,
+ new ContainerStartedTransition())
+
+ //From Running
+ .addTransition(STARTED, INIT, STOP,
+ new ContainerStoppedTransition())
+ .addTransition(STARTED, READY, BECOME_READY,
+ new ContainerBecomeReadyTransition())
+
+ // FROM READY
+ .addTransition(READY, STARTED, BECOME_NOT_READY,
+ new ContainerBecomeNotReadyTransition())
+ .addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
+ .installTopology();
+
+
+
+ public ComponentInstance(Component component,
+ ComponentInstanceId compInstanceId) {
+ this.stateMachine = stateMachineFactory.make(this);
+ this.component = component;
+ this.compInstanceId = compInstanceId;
+ this.scheduler = component.getScheduler();
+ this.yarnRegistryOperations =
+ component.getScheduler().getYarnRegistryOperations();
+ this.serviceTimelinePublisher =
+ component.getScheduler().getServiceTimelinePublisher();
+ if (YarnConfiguration
+ .timelineServiceV2Enabled(component.getScheduler().getConfig())) {
+ this.timelineServiceEnabled = true;
+ }
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.readLock = lock.readLock();
+ this.writeLock = lock.writeLock();
+ this.fs = scheduler.getContext().fs.getFileSystem();
+ }
+
+ private static class ContainerStartedTransition extends BaseTransition {
+ @Override public void transition(ComponentInstance compInstance,
+ ComponentInstanceEvent event) {
+ // Query container status for ip and host
+ compInstance.containerStatusFuture =
+ compInstance.scheduler.executorService.scheduleAtFixedRate(
+ new ContainerStatusRetriever(compInstance.scheduler,
+ compInstance.getContainerId(), compInstance), 0, 1,
+ TimeUnit.SECONDS);
+
+ org.apache.hadoop.yarn.service.api.records.Container container =
+ new org.apache.hadoop.yarn.service.api.records.Container();
+ container.setId(compInstance.getContainerId().toString());
+ container.setLaunchTime(new Date());
+ container.setState(ContainerState.RUNNING_BUT_UNREADY);
+ container.setBareHost(compInstance.container.getNodeId().getHost());
+ container.setComponentName(compInstance.getCompInstanceName());
+ if (compInstance.containerSpec != null) {
+ // remove the previous container.
+ compInstance.getCompSpec().removeContainer(compInstance.containerSpec);
+ }
+ compInstance.containerSpec = container;
+ compInstance.getCompSpec().addContainer(container);
+ compInstance.containerStartedTime = System.currentTimeMillis();
+
+ if (compInstance.timelineServiceEnabled) {
+ compInstance.serviceTimelinePublisher
+ .componentInstanceStarted(container, compInstance);
+ }
+ }
+ }
+
+ private static class ContainerBecomeReadyTransition extends BaseTransition {
+ @Override
+ public void transition(ComponentInstance compInstance,
+ ComponentInstanceEvent event) {
+ compInstance.component.incContainersReady();
+ compInstance.containerSpec.setState(ContainerState.READY);
+ }
+ }
+
+ private static class ContainerBecomeNotReadyTransition extends BaseTransition {
+ @Override
+ public void transition(ComponentInstance compInstance,
+ ComponentInstanceEvent event) {
+ compInstance.component.decContainersReady();
+ compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
+ }
+ }
+
+ private static class ContainerStoppedTransition extends BaseTransition {
+ @Override
+ public void transition(ComponentInstance compInstance,
+ ComponentInstanceEvent event) {
+ // re-ask the failed container.
+ Component comp = compInstance.component;
+ comp.requestContainers(1);
+ LOG.info(compInstance.getCompInstanceId()
+ + ": Container completed. Requested a new container." + System
+ .lineSeparator() + " exitStatus={}, diagnostics={}.",
+ event.getStatus().getExitStatus(),
+ event.getStatus().getDiagnostics());
+ String containerDiag =
+ compInstance.getCompInstanceId() + ": " + event.getStatus()
+ .getDiagnostics();
+ compInstance.diagnostics.append(containerDiag + System.lineSeparator());
+
+ boolean shouldExit = false;
+ // check if it exceeds the failure threshold
+ if (comp.currentContainerFailure.get() > comp.maxContainerFailurePerComp) {
+ String exitDiag = MessageFormat.format(
+ "[COMPONENT {0}]: Failed {1} times, exceeded the limit - {2}. Shutting down now... "
+ + System.lineSeparator(),
+ comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp);
+ compInstance.diagnostics.append(exitDiag);
+ // append to global diagnostics that will be reported to RM.
+ comp.getScheduler().getDiagnostics().append(containerDiag);
+ comp.getScheduler().getDiagnostics().append(exitDiag);
+ LOG.warn(exitDiag);
+ shouldExit = true;
+ }
+
+ // clean up registry
+ // hdfs dir content will be overwritten when a new container gets started,
+ // so no need remove.
+ compInstance.scheduler.executorService
+ .submit(compInstance::cleanupRegistry);
+
+ // remove the failed ContainerId -> CompInstance mapping
+ comp.getScheduler().removeLiveCompInstance(event.getContainerId());
+
+ if (compInstance.timelineServiceEnabled) {
+ // record in ATS
+ compInstance.serviceTimelinePublisher
+ .componentInstanceFinished(compInstance,
+ event.getStatus().getExitStatus(), event.getStatus().getState(),
+ containerDiag);
+ }
+
+ compInstance.containerSpec.setState(ContainerState.STOPPED);
+ if (shouldExit) {
+ // Sleep for 5 seconds in hope that the state can be recorded in ATS.
+ // in case there's a client polling the comp state, it can be notified.
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted on sleep while exiting.", e);
+ }
+ ExitUtil.terminate(-1);
+ }
+ }
+ }
+
+ public ComponentInstanceState getState() {
+ this.readLock.lock();
+
+ try {
+ return this.stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public void handle(ComponentInstanceEvent event) {
+ try {
+ writeLock.lock();
+ ComponentInstanceState oldState = getState();
+ try {
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitionException e) {
+ LOG.error(getCompInstanceId() + ": Invalid event " + event.getType() +
+ " at " + oldState, e);
+ }
+ if (oldState != getState()) {
+ LOG.info(getCompInstanceId() + " Transitioned from " + oldState + " to "
+ + getState() + " on " + event.getType() + " event");
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void setContainer(Container container) {
+ this.container = container;
+ this.compInstanceId.setContainerId(container.getId());
+ }
+
+ public String getCompInstanceName() {
+ return compInstanceId.getCompInstanceName();
+ }
+
+ public ContainerStatus getContainerStatus() {
+ return status;
+ }
+
+ public void updateContainerStatus(ContainerStatus status) {
+ this.status = status;
+ org.apache.hadoop.yarn.service.api.records.Container container =
+ getCompSpec().getContainer(getContainerId().toString());
+ if (container != null) {
+ container.setIp(StringUtils.join(",", status.getIPs()));
+ container.setHostname(status.getHost());
+ if (timelineServiceEnabled) {
+ serviceTimelinePublisher.componentInstanceUpdated(container);
+ }
+ }
+ updateServiceRecord(yarnRegistryOperations, status);
+ }
+
+ public ContainerId getContainerId() {
+ return container.getId();
+ }
+
+ public String getCompName() {
+ return compInstanceId.getCompName();
+ }
+
+ public void setCompInstanceDir(Path dir) {
+ this.compInstanceDir = dir;
+ }
+
+ public Component getComponent() {
+ return component;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+ public ComponentInstanceId getCompInstanceId() {
+ return compInstanceId;
+ }
+
+ public NodeId getNodeId() {
+ return this.container.getNodeId();
+ }
+
+ public org.apache.hadoop.yarn.service.api.records.Component getCompSpec() {
+ return component.getComponentSpec();
+ }
+
+ private static class BaseTransition implements
+ SingleArcTransition<ComponentInstance, ComponentInstanceEvent> {
+
+ @Override public void transition(ComponentInstance compInstance,
+ ComponentInstanceEvent event) {
+ }
+ }
+
+ public ProbeStatus ping() {
+ if (component.getProbe() == null) {
+ ProbeStatus status = new ProbeStatus();
+ status.setSuccess(true);
+ return status;
+ }
+ return component.getProbe().ping(this);
+ }
+
+ // Write service record into registry
+ private void updateServiceRecord(
+ YarnRegistryViewForProviders yarnRegistry, ContainerStatus status) {
+ ServiceRecord record = new ServiceRecord();
+ String containerId = status.getContainerId().toString();
+ record.set(YarnRegistryAttributes.YARN_ID, containerId);
+ record.description = getCompInstanceName();
+ record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+ PersistencePolicies.CONTAINER);
+ record.set("yarn:ip", status.getIPs());
+ record.set("yarn:hostname", status.getHost());
+ try {
+ yarnRegistry
+ .putComponent(RegistryPathUtils.encodeYarnID(containerId), record);
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to update service record in registry: " + containerId + "");
+ }
+ }
+
+ // Release the container , cleanup registry, hdfs dir, and record in ATS
+ public void destroy() {
+ LOG.info(getCompInstanceId() + ": Flexed down by user, destroying.");
+ diagnostics.append(getCompInstanceId() + ": Flexed down by user");
+ if (container != null) {
+ scheduler.removeLiveCompInstance(container.getId());
+ component.getScheduler().getAmRMClient()
+ .releaseAssignedContainer(container.getId());
+ getCompSpec().removeContainer(containerSpec);
+ }
+ if (timelineServiceEnabled) {
+ serviceTimelinePublisher
+ .componentInstanceFinished(this, KILLED_BY_APPMASTER, COMPLETE,
+ diagnostics.toString());
+ }
+ scheduler.executorService.submit(this::cleanupRegistryAndCompHdfsDir);
+ }
+
+ private void cleanupRegistry() {
+ ContainerId containerId = getContainerId();
+ String cid = RegistryPathUtils.encodeYarnID(containerId.toString());
+ try {
+ yarnRegistryOperations.deleteComponent(getCompInstanceId(), cid);
+ } catch (IOException e) {
+ LOG.error(getCompInstanceId() + ": Failed to delete registry", e);
+ }
+ }
+
+ //TODO Maybe have a dedicated cleanup service.
+ public void cleanupRegistryAndCompHdfsDir() {
+ cleanupRegistry();
+ try {
+ if (compInstanceDir != null && fs.exists(compInstanceDir)) {
+ boolean deleted = fs.delete(compInstanceDir, true);
+ if (!deleted) {
+ LOG.error(getCompInstanceId()
+ + ": Failed to delete component instance dir: "
+ + compInstanceDir);
+ } else {
+ LOG.info(getCompInstanceId() + ": Deleted component instance dir: "
+ + compInstanceDir);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn(getCompInstanceId() + ": Failed to delete directory", e);
+ }
+ }
+
+ // Query container status until ip and hostname are available and update
+ // the service record into registry service
+ private static class ContainerStatusRetriever implements Runnable {
+ private ContainerId containerId;
+ private NodeId nodeId;
+ private NMClient nmClient;
+ private ComponentInstance instance;
+ ContainerStatusRetriever(ServiceScheduler scheduler,
+ ContainerId containerId, ComponentInstance instance) {
+ this.containerId = containerId;
+ this.nodeId = instance.getNodeId();
+ this.nmClient = scheduler.getNmClient().getClient();
+ this.instance = instance;
+ }
+ @Override public void run() {
+ ContainerStatus status = null;
+ try {
+ status = nmClient.getContainerStatus(containerId, nodeId);
+ } catch (Exception e) {
+ if (e instanceof YarnException) {
+ throw new YarnRuntimeException(
+ instance.compInstanceId + " Failed to get container status on "
+ + nodeId + " , cancelling.", e);
+ }
+ LOG.error(instance.compInstanceId + " Failed to get container status on "
+ + nodeId + ", will try again", e);
+ return;
+ }
+ if (SliderUtils.isEmpty(status.getIPs()) || SliderUtils
+ .isUnset(status.getHost())) {
+ return;
+ }
+ instance.updateContainerStatus(status);
+ LOG.info(
+ instance.compInstanceId + " IP = " + status.getIPs() + ", host = "
+ + status.getHost() + ", cancel container status retriever");
+ instance.containerStatusFuture.cancel(false);
+ }
+ }
+
+ @Override
+ public int compareTo(ComponentInstance to) {
+ long delta = containerStartedTime - to.containerStartedTime;
+ if (delta == 0) {
+ return getCompInstanceId().compareTo(to.getCompInstanceId());
+ } else if (delta < 0) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ComponentInstance instance = (ComponentInstance) o;
+
+ if (containerStartedTime != instance.containerStartedTime)
+ return false;
+ return compInstanceId.equals(instance.compInstanceId);
+ }
+
+ @Override public int hashCode() {
+ int result = compInstanceId.hashCode();
+ result = 31 * result + (int) (containerStartedTime ^ (containerStartedTime
+ >>> 32));
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java
new file mode 100644
index 0000000..707b034
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEvent.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class ComponentInstanceEvent
+ extends AbstractEvent<ComponentInstanceEventType> {
+
+ private ContainerId id;
+ private ContainerStatus status;
+ private boolean shouldDestroy = false;
+
+ public ComponentInstanceEvent(ContainerId containerId,
+ ComponentInstanceEventType componentInstanceEventType) {
+ super(componentInstanceEventType);
+ this.id = containerId;
+ }
+
+ public ContainerId getContainerId() {
+ return id;
+ }
+
+ public ContainerStatus getStatus() {
+ return this.status;
+ }
+
+ public ComponentInstanceEvent setStatus(ContainerStatus status) {
+ this.status = status;
+ return this;
+ }
+
+ public void setShouldDestroy() {
+ shouldDestroy = true;
+ }
+
+ public boolean shouldDestroy() {
+ return shouldDestroy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
new file mode 100644
index 0000000..1a880ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+public enum ComponentInstanceEventType {
+ START,
+ STOP,
+ BECOME_READY,
+ BECOME_NOT_READY
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java
new file mode 100644
index 0000000..14387ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceId.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ComponentInstanceId implements Comparable<ComponentInstanceId> {
+
+ private long Id;
+ private String name;
+ private ContainerId containerId;
+
+ public ComponentInstanceId(long id, String name) {
+ Id = id;
+ this.name = name;
+ }
+
+ public long getId() {
+ return Id;
+ }
+
+ public String getCompName() {
+ return name;
+ }
+
+ public String getCompInstanceName() {
+ return getCompName() + "-" + getId();
+ }
+
+ public void setContainerId(ContainerId containerId) {
+ this.containerId = containerId;
+ }
+
+ @Override
+ public String toString() {
+ if (containerId == null) {
+ return "[COMPINSTANCE " + getCompInstanceName() + "]";
+ } else {
+ return "[COMPINSTANCE " + getCompInstanceName() + " : " + containerId + "]";
+ }
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ComponentInstanceId that = (ComponentInstanceId) o;
+
+ if (getId() != that.getId())
+ return false;
+ return getCompName() != null ? getCompName().equals(that.getCompName()) :
+ that.getCompName() == null;
+
+ }
+
+ @Override public int hashCode() {
+ int result = (int) (getId() ^ (getId() >>> 32));
+ result = 31 * result + (getCompName() != null ? getCompName().hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public int compareTo(ComponentInstanceId to) {
+ int delta = this.getCompName().compareTo(to.getCompName());
+ if (delta == 0) {
+ return Long.compare(this.getId(), to.getId());
+ } else if (delta < 0) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
new file mode 100644
index 0000000..f5de5cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.component.instance;
+
+public enum ComponentInstanceState {
+ INIT,
+ STARTED,
+ READY,
+ UPGRADING
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
new file mode 100644
index 0000000..6de2dc0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/RestApiConstants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+public interface RestApiConstants {
+
+ // Rest endpoints
+ String CONTEXT_ROOT = "/ws/v1";
+ String VERSION = "/services/version";
+ String SERVICE_ROOT_PATH = "/services";
+ String SERVICE_PATH = "/services/{service_name}";
+ String COMPONENT_PATH = "/services/{service_name}/components/{component_name}";
+
+ // Query param
+ String SERVICE_NAME = "service_name";
+ String COMPONENT_NAME = "component_name";
+
+ String DEFAULT_COMPONENT_NAME = "default";
+
+ String PROPERTY_REST_SERVICE_HOST = "REST_SERVICE_HOST";
+ String PROPERTY_REST_SERVICE_PORT = "REST_SERVICE_PORT";
+ Long DEFAULT_UNLIMITED_LIFETIME = -1l;
+
+ Integer ERROR_CODE_APP_DOES_NOT_EXIST = 404001;
+ Integer ERROR_CODE_APP_IS_NOT_RUNNING = 404002;
+ Integer ERROR_CODE_APP_SUBMITTED_BUT_NOT_RUNNING_YET = 404003;
+ Integer ERROR_CODE_APP_NAME_INVALID = 404004;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java
new file mode 100644
index 0000000..ee270cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/SliderExitCodes.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+import org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes;
+
+public interface SliderExitCodes extends LauncherExitCodes {
+
+ /**
+ * starting point for exit codes; not an exception itself
+ */
+ int _EXIT_CODE_BASE = 64;
+
+ /**
+ * service entered the failed state: {@value}
+ */
+ int EXIT_YARN_SERVICE_FAILED = 65;
+
+ /**
+ * service was killed: {@value}
+ */
+ int EXIT_YARN_SERVICE_KILLED = 66;
+
+ /**
+ * timeout on monitoring client: {@value}
+ */
+ int EXIT_TIMED_OUT = 67;
+
+ /**
+ * service finished with an error: {@value}
+ */
+ int EXIT_YARN_SERVICE_FINISHED_WITH_ERROR = 68;
+
+ /**
+ * the service instance is unknown: {@value}
+ */
+ int EXIT_UNKNOWN_INSTANCE = 69;
+
+ /**
+ * the service instance is in the wrong state for that operation: {@value}
+ */
+ int EXIT_BAD_STATE = 70;
+
+ /**
+ * A spawned master process failed
+ */
+ int EXIT_PROCESS_FAILED = 71;
+
+ /**
+ * The instance failed -too many containers were
+ * failing or some other threshold was reached
+ */
+ int EXIT_DEPLOYMENT_FAILED = 72;
+
+ /**
+ * The service is live -and the requested operation
+ * does not work if the cluster is running
+ */
+ int EXIT_APPLICATION_IN_USE = 73;
+
+ /**
+ * There already is an service instance of that name
+ * when an attempt is made to create a new instance
+ */
+ int EXIT_INSTANCE_EXISTS = 75;
+
+ /**
+ * Exit code when the configurations in valid/incomplete: {@value}
+ */
+ int EXIT_BAD_CONFIGURATION = 77;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
new file mode 100644
index 0000000..1968e95
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+import org.apache.hadoop.yarn.service.api.records.Configuration;
+
+public class YarnServiceConf {
+
+ // Retry settings for the ServiceClient to talk to Service AppMaster
+ public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms";
+ public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms";
+
+ // Retry settings for container failures
+ public static final String CONTAINER_RETRY_MAX = "yarn.service.container-failure.retry.max";
+ public static final String CONTAINER_RETRY_INTERVAL = "yarn.service.container-failure.retry-interval";
+
+ public static final String AM_RESTART_MAX = "yarn.service.am-restart.max-attempts";
+ public static final String AM_RESOURCE_MEM = "yarn.service.am-resource.memory";
+ public static final long DEFAULT_KEY_AM_RESOURCE_MEM = 1024;
+
+ public static final String YARN_QUEUE = "yarn.service.queue";
+
+ public static final String API_SERVER_ADDRESS = "yarn.service.api-server.address";
+ public static final String DEFAULT_API_SERVER_ADDRESS = "0.0.0.0:";
+ public static final int DEFAULT_API_SERVER_PORT = 9191;
+
+ /**
+ * The yarn service base path:
+ * Defaults to HomeDir/.yarn/
+ */
+ public static final String YARN_SERVICE_BASE_PATH = "yarn.service.base.path";
+
+ //TODO rename
+ /** Declare that a keytab must be provided */
+ public static final String KEY_AM_LOGIN_KEYTAB_REQUIRED = "slider.am.login.keytab.required";
+ public static final String KEY_AM_LOGIN_KEYTAB_NAME = "slider.am.login.keytab.name";
+ public static final String KEY_HDFS_KEYTAB_DIR = "slider.hdfs.keytab.dir";
+ public static final String KEY_AM_KEYTAB_LOCAL_PATH = "slider.am.keytab.local.path";
+
+ /**
+ * maximum number of failed containers (in a single component)
+ * before the app exits
+ */
+ public static final String CONTAINER_FAILURE_THRESHOLD =
+ "yarn.service.container-failure-per-component.threshold";
+ /**
+ * Maximum number of container failures on a node before the node is blacklisted
+ */
+ public static final String NODE_BLACKLIST_THRESHOLD =
+ "yarn.service.node-blacklist.threshold";
+
+ /**
+ * The failure count for CONTAINER_FAILURE_THRESHOLD and NODE_BLACKLIST_THRESHOLD
+ * gets reset periodically, the unit is seconds.
+ */
+ public static final String CONTAINER_FAILURE_WINDOW =
+ "yarn.service.failure-count-reset.window";
+
+ /**
+ * interval between readiness checks.
+ */
+ public static final String READINESS_CHECK_INTERVAL = "yarn.service.readiness-check-interval.seconds";
+ public static final int DEFAULT_READINESS_CHECK_INTERVAL = 30; // seconds
+
+ /**
+ * Get long value for the property. First get from the userConf, if not
+ * present, get from systemConf.
+ *
+ * @param name name of the property
+ * @param defaultValue default value of the property, if it is not defined in
+ * userConf and systemConf.
+ * @param userConf Configuration provided by client in the JSON definition
+ * @param systemConf The YarnConfiguration in the system.
+ * @return long value for the property
+ */
+ public static long getLong(String name, long defaultValue,
+ Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) {
+ return userConf.getPropertyLong(name, systemConf.getLong(name, defaultValue));
+ }
+
+ public static int getInt(String name, int defaultValue,
+ Configuration userConf, org.apache.hadoop.conf.Configuration systemConf) {
+ return userConf.getPropertyInt(name, systemConf.getInt(name, defaultValue));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
new file mode 100644
index 0000000..e5ed703
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.conf;
+
+public interface YarnServiceConstants {
+
+ /**
+ * The path under which cluster and temp data are stored
+ */
+ String SERVICE_BASE_DIRECTORY = ".yarn";
+
+ /**
+ * The paths under which Service AM dependency libraries are stored
+ */
+ String DEPENDENCY_LOCALIZED_DIR_LINK = "service_dep";
+ String DEPENDENCY_DIR = "/yarn-services/%s/";
+ String DEPENDENCY_TAR_GZ_FILE_NAME = "service-dep";
+ String DEPENDENCY_TAR_GZ_FILE_EXT = ".tar.gz";
+ String DEPENDENCY_DIR_PERMISSIONS = "755";
+
+ /**
+ * Service type for YARN service
+ */
+ String APP_TYPE = "yarn-service";
+
+ String KEYTAB_DIR = "keytabs";
+ String RESOURCE_DIR = "resources";
+
+
+ String SERVICES_DIRECTORY = "services";
+
+ /**
+ * JVM property to define the service lib directory;
+ * this is set by the yarn.sh script
+ */
+ String PROPERTY_LIB_DIR = "service.libdir";
+
+ /**
+ * name of generated dir for this conf
+ */
+ String SUBMITTED_CONF_DIR = "conf";
+
+ /**
+ * Service AM log4j file name
+ */
+ String YARN_SERVICE_LOG4J_FILENAME = "yarnservice-log4j.properties";
+
+ /**
+ * Log4j sysprop to name the resource
+ */
+ String SYSPROP_LOG4J_CONFIGURATION = "log4j.configuration";
+
+ /**
+ * sysprop for Service AM log4j directory
+ */
+ String SYSPROP_LOG_DIR = "LOG_DIR";
+
+ String TMP_DIR_PREFIX = "tmp";
+
+
+ String SERVICE_CORE_JAR = "yarn-service-core.jar";
+
+ String STDOUT_AM = "serviceam-out.txt";
+ String STDERR_AM = "serviceam-err.txt";
+
+ String HADOOP_USER_NAME = "HADOOP_USER_NAME";
+
+ String APP_CONF_DIR = "conf";
+
+ String APP_LIB_DIR = "lib";
+
+ String OUT_FILE = "stdout.txt";
+ String ERR_FILE = "stderr.txt";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
new file mode 100644
index 0000000..e4eae20
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.containerlaunch;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.apache.hadoop.yarn.service.provider.docker.DockerKeys.DEFAULT_DOCKER_NETWORK;
+
+/**
+ * Launcher of applications: base class
+ */
+public class AbstractLauncher {
+ private static final Logger log =
+ LoggerFactory.getLogger(AbstractLauncher.class);
+ public static final String CLASSPATH = "CLASSPATH";
+ /**
+ * Filesystem to use for the launch
+ */
+ protected final CoreFileSystem coreFileSystem;
+ /**
+ * Env vars; set up at final launch stage
+ */
+ protected final Map<String, String> envVars = new HashMap<>();
+ protected final ContainerLaunchContext containerLaunchContext =
+ Records.newRecord(ContainerLaunchContext.class);
+ protected final List<String> commands = new ArrayList<>(20);
+ protected final Map<String, LocalResource> localResources = new HashMap<>();
+ protected final Map<String, String> mountPaths = new HashMap<>();
+ private final Map<String, ByteBuffer> serviceData = new HashMap<>();
+ // security
+ protected final Credentials credentials;
+ protected boolean yarnDockerMode = false;
+ protected String dockerImage;
+ protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
+ protected String dockerHostname;
+ protected String runPrivilegedContainer;
+
+
+ /**
+ * Create instance.
+ * @param coreFileSystem filesystem
+ * @param credentials initial set of credentials -null is permitted
+ */
+ public AbstractLauncher(
+ CoreFileSystem coreFileSystem,
+ Credentials credentials) {
+ this.coreFileSystem = coreFileSystem;
+ this.credentials = credentials != null ? credentials: new Credentials();
+ }
+
+ public void setYarnDockerMode(boolean yarnDockerMode){
+ this.yarnDockerMode = yarnDockerMode;
+ }
+
+ /**
+ * Get the env vars to work on
+ * @return env vars
+ */
+ public Map<String, String> getEnv() {
+ return envVars;
+ }
+
+ /**
+ * Get the launch commands.
+ * @return the live list of commands
+ */
+ public List<String> getCommands() {
+ return commands;
+ }
+
+ public void addLocalResource(String subPath, LocalResource resource) {
+ localResources.put(subPath, resource);
+ }
+
+ public void addLocalResource(String subPath, LocalResource resource, String mountPath) {
+ localResources.put(subPath, resource);
+ mountPaths.put(subPath, mountPath);
+ }
+
+ /**
+ * Accessor to the credentials
+ * @return the credentials associated with this launcher
+ */
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+
+ public void addCommand(String cmd) {
+ commands.add(cmd);
+ }
+
+ /**
+ * Complete the launch context (copy in env vars, etc).
+ * @return the container to launch
+ */
+ public ContainerLaunchContext completeContainerLaunch() throws IOException {
+
+ String cmdStr = SliderUtils.join(commands, " ", false);
+ log.debug("Completed setting up container command {}", cmdStr);
+ containerLaunchContext.setCommands(commands);
+
+ //env variables
+ if (log.isDebugEnabled()) {
+ log.debug("Environment variables");
+ for (Map.Entry<String, String> envPair : envVars.entrySet()) {
+ log.debug(" \"{}\"=\"{}\"", envPair.getKey(), envPair.getValue());
+ }
+ }
+ containerLaunchContext.setEnvironment(envVars);
+
+ //service data
+ if (log.isDebugEnabled()) {
+ log.debug("Service Data size");
+ for (Map.Entry<String, ByteBuffer> entry : serviceData.entrySet()) {
+ log.debug("\"{}\"=> {} bytes of data", entry.getKey(),
+ entry.getValue().array().length);
+ }
+ }
+ containerLaunchContext.setServiceData(serviceData);
+
+ // resources
+ dumpLocalResources();
+ containerLaunchContext.setLocalResources(localResources);
+
+ //tokens
+ log.debug("{} tokens", credentials.numberOfTokens());
+ containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
+ credentials));
+
+ if(yarnDockerMode){
+ Map<String, String> env = containerLaunchContext.getEnvironment();
+ env.put("YARN_CONTAINER_RUNTIME_TYPE", "docker");
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_IMAGE", dockerImage);
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_NETWORK", dockerNetwork);
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_CONTAINER_HOSTNAME",
+ dockerHostname);
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_RUN_PRIVILEGED_CONTAINER", runPrivilegedContainer);
+ StringBuilder sb = new StringBuilder();
+ for (Entry<String,String> mount : mountPaths.entrySet()) {
+ if (sb.length() > 0) {
+ sb.append(",");
+ }
+ sb.append(mount.getKey());
+ sb.append(":");
+ sb.append(mount.getValue());
+ }
+ env.put("YARN_CONTAINER_RUNTIME_DOCKER_LOCAL_RESOURCE_MOUNTS", sb.toString());
+ log.info("yarn docker env var has been set {}", containerLaunchContext.getEnvironment().toString());
+ }
+
+ return containerLaunchContext;
+ }
+
+ public void setRetryContext(int maxRetries, int retryInterval) {
+ ContainerRetryContext retryContext = ContainerRetryContext
+ .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, maxRetries,
+ retryInterval);
+ containerLaunchContext.setContainerRetryContext(retryContext);
+ }
+
+ /**
+ * Dump local resources at debug level
+ */
+ private void dumpLocalResources() {
+ if (log.isDebugEnabled()) {
+ log.debug("{} resources: ", localResources.size());
+ for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
+
+ String key = entry.getKey();
+ LocalResource val = entry.getValue();
+ log.debug(key + "=" + SliderUtils.stringify(val.getResource()));
+ }
+ }
+ }
+
+ /**
+ * This is critical for an insecure cluster -it passes
+ * down the username to YARN, and so gives the code running
+ * in containers the rights it needs to work with
+ * data.
+ * @throws IOException problems working with current user
+ */
+ protected void propagateUsernameInInsecureCluster() throws IOException {
+ //insecure cluster: propagate user name via env variable
+ String userName = UserGroupInformation.getCurrentUser().getUserName();
+ envVars.put(YarnServiceConstants.HADOOP_USER_NAME, userName);
+ }
+
+ /**
+ * Utility method to set up the classpath
+ * @param classpath classpath to use
+ */
+ public void setClasspath(ClasspathConstructor classpath) {
+ setEnv(CLASSPATH, classpath.buildClasspath());
+ }
+
+ /**
+ * Set an environment variable in the launch context
+ * @param var variable name
+ * @param value value (must be non null)
+ */
+ public void setEnv(String var, String value) {
+ Preconditions.checkArgument(var != null, "null variable name");
+ Preconditions.checkArgument(value != null, "null value");
+ envVars.put(var, value);
+ }
+
+
+ public void putEnv(Map<String, String> map) {
+ envVars.putAll(map);
+ }
+
+
+ public void setDockerImage(String dockerImage) {
+ this.dockerImage = dockerImage;
+ }
+
+ public void setDockerNetwork(String dockerNetwork) {
+ this.dockerNetwork = dockerNetwork;
+ }
+
+ public void setDockerHostname(String dockerHostname) {
+ this.dockerHostname = dockerHostname;
+ }
+
+ public void setRunPrivilegedContainer(boolean runPrivilegedContainer) {
+ if (runPrivilegedContainer) {
+ this.runPrivilegedContainer = Boolean.toString(true);
+ } else {
+ this.runPrivilegedContainer = Boolean.toString(false);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea399dfd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java
new file mode 100644
index 0000000..22b3877
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ClasspathConstructor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.containerlaunch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * build a classpath -allows for entries to be injected in front of
+ * YARN classpath as well as behind, adds appropriate separators,
+ * extraction of local classpath, etc.
+ */
+public class ClasspathConstructor {
+
+ public static final String CLASS_PATH_SEPARATOR = ApplicationConstants.CLASS_PATH_SEPARATOR;
+ private final List<String> pathElements = new ArrayList<>();
+
+ public ClasspathConstructor() {
+ }
+
+
+ /**
+ * Get the list of JARs from the YARN settings
+ * @param config configuration
+ */
+ public List<String> yarnApplicationClasspath(Configuration config) {
+ String[] cp = config.getTrimmedStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH);
+ return cp != null ? Arrays.asList(cp) : new ArrayList<String>(0);
+
+ }
+
+
+ @Override
+ public String toString() {
+ return buildClasspath();
+ }
+
+ public String buildClasspath() {
+ return SliderUtils.join(pathElements,
+ CLASS_PATH_SEPARATOR,
+ false);
+ }
+
+ /**
+ * Get a copy of the path list
+ * @return the JARs
+ */
+ public List<String> getPathElements() {
+ return Collections.unmodifiableList(pathElements);
+ }
+
+ /**
+ * Append an entry
+ * @param path path
+ */
+ public void append(String path) {
+ pathElements.add(path);
+ }
+
+ /**
+ * Insert a path at the front of the list. This places it ahead of
+ * the standard YARN artifacts
+ * @param path path to the JAR. Absolute or relative -on the target
+ * system
+ */
+ public void insert(String path) {
+ pathElements.add(0, path);
+ }
+
+ public void appendAll(Collection<String> paths) {
+ pathElements.addAll(paths);
+ }
+
+ public void insertAll(Collection<String> paths) {
+ pathElements.addAll(0, paths);
+ }
+
+
+ public void addLibDir(String pathToLibDir) {
+ append(buildLibDir(pathToLibDir));
+ }
+
+ public void insertLibDir(String pathToLibDir) {
+ insert(buildLibDir(pathToLibDir));
+ }
+
+ public void addClassDirectory(String pathToDir) {
+ append(appendDirectoryTerminator(pathToDir));
+ }
+
+ public void insertClassDirectory(String pathToDir) {
+ insert(buildLibDir(appendDirectoryTerminator(pathToDir)));
+ }
+
+
+ public void addRemoteClasspathEnvVar() {
+ append(ApplicationConstants.Environment.CLASSPATH.$$());
+ }
+
+
+ public void insertRemoteClasspathEnvVar() {
+ append(ApplicationConstants.Environment.CLASSPATH.$$());
+ }
+
+
+ /**
+ * Build a lib dir path
+ * @param pathToLibDir path to the directory; may or may not end with a
+ * trailing space
+ * @return a path to a lib dir that is compatible with the java classpath
+ */
+ public String buildLibDir(String pathToLibDir) {
+ String dir = appendDirectoryTerminator(pathToLibDir);
+ dir += "*";
+ return dir;
+ }
+
+ private String appendDirectoryTerminator(String pathToLibDir) {
+ String dir = pathToLibDir.trim();
+ if (!dir.endsWith("/")) {
+ dir += "/";
+ }
+ return dir;
+ }
+
+ /**
+ * Split a classpath. This uses the local path separator so MUST NOT
+ * be used to work with remote classpaths
+ * @param localpath local path
+ * @return a splite
+ */
+ public Collection<String> splitClasspath(String localpath) {
+ String separator = System.getProperty("path.separator");
+ return StringUtils.getStringCollection(localpath, separator);
+ }
+
+ /**
+ * Get the local JVM classpath split up
+ * @return the list of entries on the JVM classpath env var
+ */
+ public Collection<String> localJVMClasspath() {
+ return splitClasspath(System.getProperty("java.class.path"));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org