You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/12/12 23:59:23 UTC

[48/50] hadoop git commit: YARN-7565. Yarn service pre-maturely releases the container after AM restart. Contributed by Chandni Singh

YARN-7565. Yarn service pre-maturely releases the container after AM restart. Contributed by Chandni Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3ebe6a78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3ebe6a78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3ebe6a78

Branch: refs/heads/HDFS-7240
Commit: 3ebe6a7819292ce6bd557e36137531b59890c845
Parents: 06f0eb2
Author: Jian He <ji...@apache.org>
Authored: Tue Dec 12 13:35:56 2017 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Dec 12 13:35:56 2017 -0800

----------------------------------------------------------------------
 .../hadoop/yarn/service/ServiceScheduler.java   |  72 +++++++++-
 .../yarn/service/component/Component.java       |   8 ++
 .../component/instance/ComponentInstance.java   |  10 +-
 .../yarn/service/conf/YarnServiceConf.java      |  10 ++
 .../hadoop/yarn/service/MockServiceAM.java      | 144 ++++++++++++++++---
 .../hadoop/yarn/service/TestServiceAM.java      |  98 ++++++++++++-
 .../yarn/client/api/async/AMRMClientAsync.java  |  10 ++
 .../api/async/impl/AMRMClientAsyncImpl.java     |   5 +
 .../types/yarn/YarnRegistryAttributes.java      |   1 +
 .../markdown/yarn-service/Configurations.md     |   1 +
 10 files changed, 330 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index bea31cf..2697050 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventT
 import org.apache.hadoop.yarn.service.component.Component;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.component.ComponentEventType;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
 import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
 import org.apache.hadoop.yarn.service.provider.ProviderUtils;
@@ -137,6 +138,9 @@ public class ServiceScheduler extends CompositeService {
   private YarnRegistryViewForProviders yarnRegistryOperations;
   private ServiceContext context;
   private ContainerLaunchService containerLaunchService;
+  private final Map<ContainerId, ComponentInstance> unRecoveredInstances =
+      new ConcurrentHashMap<>();
+  private long containerRecoveryTimeout;
 
   public ServiceScheduler(ServiceContext context) {
     super(context.service.getName());
@@ -212,6 +216,9 @@ public class ServiceScheduler extends CompositeService {
     createConfigFileCache(context.fs.getFileSystem());
 
     createAllComponents();
+    containerRecoveryTimeout = getConfig().getInt(
+        YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS,
+        YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS);
   }
 
   protected YarnRegistryViewForProviders createYarnRegistryOperations(
@@ -320,7 +327,7 @@ public class ServiceScheduler extends CompositeService {
     }
     for (Container container : containersFromPrevAttempt) {
       LOG.info("Handling {} from previous attempt", container.getId());
-      ServiceRecord record = existingRecords.get(RegistryPathUtils
+      ServiceRecord record = existingRecords.remove(RegistryPathUtils
           .encodeYarnID(container.getId().toString()));
       if (record != null) {
         Component comp = componentsById.get(container.getAllocationRequestId());
@@ -337,6 +344,40 @@ public class ServiceScheduler extends CompositeService {
         amRMClient.releaseAssignedContainer(container.getId());
       }
     }
+
+    existingRecords.forEach((encodedContainerId, record) -> {
+      String componentName = record.get(YarnRegistryAttributes.YARN_COMPONENT);
+      if (componentName != null) {
+        Component component = componentsByName.get(componentName);
+        ComponentInstance compInstance = component.getComponentInstance(
+            record.description);
+        ContainerId containerId = ContainerId.fromString(record.get(
+            YarnRegistryAttributes.YARN_ID));
+        unRecoveredInstances.put(containerId, compInstance);
+        component.removePendingInstance(compInstance);
+      }
+    });
+
+    if (unRecoveredInstances.size() > 0) {
+      executorService.schedule(() -> {
+        synchronized (unRecoveredInstances) {
+          // after containerRecoveryTimeout, all the containers that haven't be
+          // recovered by the RM will released. The corresponding Component
+          // Instances are added to the pending queues of their respective
+          // component.
+          unRecoveredInstances.forEach((containerId, instance) -> {
+            LOG.info("{}, wait on container {} expired",
+                instance.getCompInstanceId(), containerId);
+            instance.cleanupRegistryAndCompHdfsDir(containerId);
+            Component component = componentsByName.get(instance.getCompName());
+            component.requestContainers(1);
+            component.reInsertPendingInstance(instance);
+            amRMClient.releaseAssignedContainer(containerId);
+          });
+          unRecoveredInstances.clear();
+        }
+      }, containerRecoveryTimeout, TimeUnit.MILLISECONDS);
+    }
   }
 
   private void initGlobalTokensForSubstitute(ServiceContext context) {
@@ -521,6 +562,35 @@ public class ServiceScheduler extends CompositeService {
       }
     }
 
+
+    @Override
+    public void onContainersReceivedFromPreviousAttempts(
+        List<Container> containers) {
+      if (containers == null || containers.isEmpty()) {
+        return;
+      }
+      for (Container container : containers) {
+        ComponentInstance compInstance;
+        synchronized (unRecoveredInstances) {
+          compInstance = unRecoveredInstances.remove(container.getId());
+        }
+        if (compInstance != null) {
+          Component component = componentsById.get(
+              container.getAllocationRequestId());
+          ComponentEvent event = new ComponentEvent(component.getName(),
+              CONTAINER_RECOVERED)
+              .setInstance(compInstance)
+              .setContainerId(container.getId())
+              .setContainer(container);
+          component.handle(event);
+        } else {
+          LOG.info("Not waiting to recover container {}, releasing",
+              container.getId());
+          amRMClient.releaseAssignedContainer(container.getId());
+        }
+      }
+    }
+
     @Override
     public void onContainersCompleted(List<ContainerStatus> statuses) {
       for (ContainerStatus status : statuses) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 4e05e5f..5189ab1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -107,6 +107,10 @@ public class Component implements EventHandler<ComponentEvent> {
           .addTransition(INIT, INIT, CONTAINER_RECOVERED,
               new ContainerRecoveredTransition())
 
+          // container recovered in AM heartbeat
+          .addTransition(FLEXING, FLEXING, CONTAINER_RECOVERED,
+              new ContainerRecoveredTransition())
+
           // container allocated by RM
           .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
               new ContainerAllocatedTransition())
@@ -309,6 +313,10 @@ public class Component implements EventHandler<ComponentEvent> {
     }
   }
 
+  public void removePendingInstance(ComponentInstance instance) {
+    pendingInstances.remove(instance);
+  }
+
   public void reInsertPendingInstance(ComponentInstance instance) {
     pendingInstances.add(instance);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 509f667..31fa5c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -39,15 +39,15 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.component.Component;
+import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
+import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
+import org.apache.hadoop.yarn.service.utils.ServiceUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.BoundedAppender;
-import org.apache.hadoop.yarn.service.utils.ServiceUtils;
-import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
-import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
-import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,7 +62,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*;
 import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER;
-import static org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*;
 import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*;
 
@@ -398,6 +397,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
     record.set(YARN_PERSISTENCE, PersistencePolicies.CONTAINER);
     record.set(YARN_IP, status.getIPs().get(0));
     record.set(YARN_HOSTNAME, status.getHost());
+    record.set(YARN_COMPONENT, component.getName());
     try {
       yarnRegistry
           .putComponent(RegistryPathUtils.encodeYarnID(containerId), record);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
index ea8904a..22926e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.service.api.records.Configuration;
 
 public class YarnServiceConf {
 
+  private static final String YARN_SERVICE_PREFIX = "yarn.service.";
+
   // Retry settings for the ServiceClient to talk to Service AppMaster
   public static final String CLIENT_AM_RETRY_MAX_WAIT_MS = "yarn.service.client-am.retry.max-wait-ms";
   public static final String CLIENT_AM_RETRY_MAX_INTERVAL_MS = "yarn.service.client-am.retry-interval-ms";
@@ -84,6 +86,14 @@ public class YarnServiceConf {
   public static final String JVM_OPTS = "yarn.service.am.java.opts";
 
   /**
+   * How long to wait until a container is considered dead.
+   */
+  public static final String CONTAINER_RECOVERY_TIMEOUT_MS =
+      YARN_SERVICE_PREFIX + "container-recovery.timeout.ms";
+
+  public static final int DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS = 120000;
+
+  /**
    * Get long value for the property. First get from the userConf, if not
    * present, get from systemConf.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
index 4298161..37b18fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockServiceAM.java
@@ -19,8 +19,13 @@
 package org.apache.hadoop.yarn.service;
 
 import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -42,15 +47,24 @@ import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
 import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class MockServiceAM extends ServiceMaster {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MockServiceAM.class);
+
   Service service;
   // The list of containers fed by tests to be returned on
   // AMRMClientCallBackHandler#onContainersAllocated
@@ -59,6 +73,16 @@ public class MockServiceAM extends ServiceMaster {
 
   final List<ContainerStatus> failedContainers =
       Collections.synchronizedList(new LinkedList<>());
+
+  private final List<Container> recoveredContainers =
+      Collections.synchronizedList(new LinkedList<>());
+
+  private final Map<String, ServiceRecord> registryComponents =
+      new ConcurrentHashMap<>();
+
+  private Map<ContainerId, ContainerStatus> containerStatuses =
+      new ConcurrentHashMap<>();
+
   public MockServiceAM(Service service) {
     super(service.getName());
     this.service = service;
@@ -75,7 +99,7 @@ public class MockServiceAM extends ServiceMaster {
   @Override
   protected Path getAppDir() {
     Path path = new Path(new Path("target", "apps"), service.getName());
-    System.out.println("Service path: " + path);
+    LOG.info("Service path: {}", path);
     return path;
   }
 
@@ -84,10 +108,24 @@ public class MockServiceAM extends ServiceMaster {
       throws IOException, YarnException {
     return new ServiceScheduler(context) {
 
+      @SuppressWarnings("SuspiciousMethodCalls")
       @Override
       protected YarnRegistryViewForProviders createYarnRegistryOperations(
           ServiceContext context, RegistryOperations registryClient) {
-        return mock(YarnRegistryViewForProviders.class);
+        YarnRegistryViewForProviders yarnRegistryView = mock(
+            YarnRegistryViewForProviders.class);
+        if (!registryComponents.isEmpty()) {
+          try {
+            when(yarnRegistryView.listComponents())
+                .thenReturn(new LinkedList<>(registryComponents.keySet()));
+            when(yarnRegistryView.getComponent(anyString())).thenAnswer(
+                invocation ->
+                    registryComponents.get(invocation.getArguments()[0]));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return yarnRegistryView;
       }
 
       @Override
@@ -101,7 +139,7 @@ public class MockServiceAM extends ServiceMaster {
             // add new containers if any
             synchronized (feedContainers) {
               if (feedContainers.isEmpty()) {
-                System.out.println("Allocating........ no containers");
+                LOG.info("Allocating........ no containers");
               } else {
                 // The AMRMClient will return containers for compoenent that are
                 // at FLEXING state
@@ -112,7 +150,7 @@ public class MockServiceAM extends ServiceMaster {
                   org.apache.hadoop.yarn.service.component.Component component =
                       componentsById.get(c.getAllocationRequestId());
                   if (component.getState() == ComponentState.FLEXING) {
-                    System.out.println("Allocated container " + c.getId());
+                    LOG.info("Allocated container {} ", c.getId());
                     allocatedContainers.add(c);
                     itor.remove();
                   }
@@ -121,6 +159,17 @@ public class MockServiceAM extends ServiceMaster {
               }
             }
 
+            // add recovered containers if any
+            synchronized (recoveredContainers) {
+              if (!recoveredContainers.isEmpty()) {
+                List<Container> containersFromPrevAttempt = new LinkedList<>();
+                containersFromPrevAttempt.addAll(recoveredContainers);
+                recoveredContainers.clear();
+                builder.containersFromPreviousAttempt(
+                    containersFromPrevAttempt);
+              }
+            }
+
             // add failed containers if any
             synchronized (failedContainers) {
               if (!failedContainers.isEmpty()) {
@@ -146,15 +195,23 @@ public class MockServiceAM extends ServiceMaster {
           }
         };
 
-        return AMRMClientAsync
-            .createAMRMClientAsync(client1, 1000,
+        return AMRMClientAsync.createAMRMClientAsync(client1, 1000,
                 this.new AMRMClientCallback());
       }
 
+      @SuppressWarnings("SuspiciousMethodCalls")
       @Override
       public NMClientAsync createNMClient() {
         NMClientAsync nmClientAsync = super.createNMClient();
-        nmClientAsync.setClient(mock(NMClient.class));
+        NMClient nmClient = mock(NMClient.class);
+        try {
+          when(nmClient.getContainerStatus(anyObject(), anyObject()))
+              .thenAnswer(invocation ->
+                  containerStatuses.get(invocation.getArguments()[0]));
+        } catch (YarnException | IOException e) {
+          throw new RuntimeException(e);
+        }
+        nmClientAsync.setClient(nmClient);
         return nmClientAsync;
       }
     };
@@ -165,6 +222,33 @@ public class MockServiceAM extends ServiceMaster {
     context.service = service;
   }
 
+  public void feedRegistryComponent(ContainerId containerId, String compName,
+      String compInstName) {
+    ServiceRecord record = new ServiceRecord();
+    record.set(YarnRegistryAttributes.YARN_ID, containerId.toString());
+    record.description = compInstName;
+    record.set(YarnRegistryAttributes.YARN_PERSISTENCE,
+        PersistencePolicies.CONTAINER);
+    record.set(YarnRegistryAttributes.YARN_IP, "localhost");
+    record.set(YarnRegistryAttributes.YARN_HOSTNAME, "localhost");
+    record.set(YarnRegistryAttributes.YARN_COMPONENT, compName);
+    registryComponents.put(RegistryPathUtils.encodeYarnID(
+        containerId.toString()), record);
+  }
+
+  /**
+   * Simulates a recovered container that is sent to the AM in the heartbeat
+   * response.
+   *
+   * @param containerId The ID for the container
+   * @param compName    The component to which the recovered container is fed.
+   */
+  public void feedRecoveredContainer(ContainerId containerId, String compName) {
+    Container container = createContainer(containerId, compName);
+    recoveredContainers.add(container);
+    addContainerStatus(container, ContainerState.RUNNING);
+  }
+
   /**
    *
    * @param service The service for the component
@@ -174,20 +258,12 @@ public class MockServiceAM extends ServiceMaster {
    */
   public Container feedContainerToComp(Service service, int id,
       String compName) {
-    ApplicationId applicationId = ApplicationId.fromString(service.getId());
-    ContainerId containerId = ContainerId
-        .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id);
-    NodeId nodeId = NodeId.newInstance("localhost", 1234);
-    Container container = Container
-        .newInstance(containerId, nodeId, "localhost",
-            Resource.newInstance(100, 1), Priority.newInstance(0), null);
-
-    long allocateId =
-        context.scheduler.getAllComponents().get(compName).getAllocateId();
-    container.setAllocationRequestId(allocateId);
+    ContainerId containerId = createContainerId(id);
+    Container container = createContainer(containerId, compName);
     synchronized (feedContainers) {
       feedContainers.add(container);
     }
+    addContainerStatus(container, ContainerState.RUNNING);
     return container;
   }
 
@@ -196,13 +272,30 @@ public class MockServiceAM extends ServiceMaster {
     ApplicationId applicationId = ApplicationId.fromString(service.getId());
     ContainerId containerId = ContainerId
         .newContainerId(ApplicationAttemptId.newInstance(applicationId, 1), id);
-    ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
-    containerStatus.setContainerId(containerId);
+    ContainerStatus status = Records.newRecord(ContainerStatus.class);
+    status.setContainerId(containerId);
     synchronized (failedContainers) {
-      failedContainers.add(containerStatus);
+      failedContainers.add(status);
     }
   }
 
+  public ContainerId createContainerId(int id) {
+    ApplicationId applicationId = ApplicationId.fromString(service.getId());
+    return ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(applicationId, 1), id);
+  }
+
+  private Container createContainer(ContainerId containerId, String compName) {
+    NodeId nodeId = NodeId.newInstance("localhost", 1234);
+    Container container = Container.newInstance(
+        containerId, nodeId, "localhost",
+        Resource.newInstance(100, 1),
+        Priority.newInstance(0), null);
+    long allocateId =
+        context.scheduler.getAllComponents().get(compName).getAllocateId();
+    container.setAllocationRequestId(allocateId);
+    return container;
+  }
 
   public void flexComponent(String compName, long numberOfContainers)
       throws IOException {
@@ -256,4 +349,13 @@ public class MockServiceAM extends ServiceMaster {
       }
     }, 1000, 20000);
   }
+
+  private void addContainerStatus(Container container, ContainerState state) {
+    ContainerStatus status = ContainerStatus.newInstance(container.getId(),
+        state, "", 0);
+    status.setHost(container.getNodeId().getHost());
+    status.setIPs(Lists.newArrayList(container.getNodeId().getHost()));
+    containerStatuses.put(container.getId(), status);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
index fb4de0d..2a3303e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceAM.java
@@ -20,15 +20,21 @@ package org.apache.hadoop.yarn.service;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.component.ComponentState;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,6 +45,9 @@ import static org.apache.hadoop.registry.client.api.RegistryConstants
 
 public class TestServiceAM extends ServiceTestUtils{
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestServiceAM.class);
+
   private File basedir;
   YarnConfiguration conf = new YarnConfiguration();
   TestingCluster zkCluster;
@@ -54,7 +63,7 @@ public class TestServiceAM extends ServiceTestUtils{
     zkCluster = new TestingCluster(1);
     zkCluster.start();
     conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
-    System.out.println("ZK cluster: " +  zkCluster.getConnectString());
+    LOG.info("ZK cluster: {}", zkCluster.getConnectString());
   }
 
   @After
@@ -91,7 +100,7 @@ public class TestServiceAM extends ServiceTestUtils{
     am.feedContainerToComp(exampleApp, 1, "compa");
     am.waitForCompInstanceState(compa0, ComponentInstanceState.STARTED);
 
-    System.out.println("Fail the container 1");
+    LOG.info("Fail the container 1");
     // fail the container
     am.feedFailedContainerToComp(exampleApp, 1, "compa");
 
@@ -106,4 +115,89 @@ public class TestServiceAM extends ServiceTestUtils{
         am.getComponent("compa").getPendingInstances().size());
     am.stop();
   }
+
+  // Test to verify that the containers of previous attempt are not prematurely
+  // released. These containers are sent by the RM to the AM in the
+  // heartbeat response.
+  @Test(timeout = 200000)
+  public void testContainersFromPreviousAttemptsWithRMRestart()
+      throws Exception {
+    ApplicationId applicationId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    Service exampleApp = new Service();
+    exampleApp.setId(applicationId.toString());
+    exampleApp.setName("testContainersRecovers");
+    String comp1Name = "comp1";
+    String comp1InstName = "comp1-0";
+
+    org.apache.hadoop.yarn.service.api.records.Component compA =
+        createComponent(comp1Name, 1, "sleep");
+    exampleApp.addComponent(compA);
+
+    MockServiceAM am = new MockServiceAM(exampleApp);
+    ContainerId containerId = am.createContainerId(1);
+    am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
+    am.init(conf);
+    am.start();
+
+    ComponentInstance comp10 = am.getCompInstance(comp1Name, comp1InstName);
+    am.feedRecoveredContainer(containerId, comp1Name);
+    am.waitForCompInstanceState(comp10, ComponentInstanceState.STARTED);
+
+    // 0 pending instance
+    Assert.assertEquals(0,
+        am.getComponent(comp1Name).getPendingInstances().size());
+
+    GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
+        .getContainerStatus() != null, 2000, 200000);
+
+    Assert.assertEquals("container state",
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+        am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
+            .getState());
+    am.stop();
+  }
+
+  // Test to verify that the containers of previous attempt are released and the
+  // component instance is added to the pending queue when the recovery wait
+  // time interval elapses.
+  @Test(timeout = 200000)
+  public void testContainersReleasedWhenExpired()
+      throws Exception {
+    ApplicationId applicationId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    Service exampleApp = new Service();
+    exampleApp.setId(applicationId.toString());
+    exampleApp.setName("testContainersRecovers");
+    String comp1Name = "comp1";
+    String comp1InstName = "comp1-0";
+
+    org.apache.hadoop.yarn.service.api.records.Component compA =
+        createComponent(comp1Name, 1, "sleep");
+    exampleApp.addComponent(compA);
+
+    MockServiceAM am = new MockServiceAM(exampleApp);
+    ContainerId containerId = am.createContainerId(1);
+    am.feedRegistryComponent(containerId, comp1Name, comp1InstName);
+    conf.setLong(YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, 10);
+    am.init(conf);
+    am.start();
+    Thread.sleep(100);
+    GenericTestUtils.waitFor(() -> am.getComponent(comp1Name).getState().equals(
+        ComponentState.FLEXING), 100, 2000);
+
+    // 1 pending instance
+    Assert.assertEquals(1,
+        am.getComponent(comp1Name).getPendingInstances().size());
+
+    am.feedContainerToComp(exampleApp, 2, comp1Name);
+
+    GenericTestUtils.waitFor(() -> am.getCompInstance(comp1Name, comp1InstName)
+        .getContainerStatus() != null, 2000, 200000);
+    Assert.assertEquals("container state",
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+        am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
+            .getState());
+    am.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 793ad79..a3d7959 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -486,6 +486,14 @@ extends AbstractService {
      * stop() is the recommended action.
      */
     public abstract void onError(Throwable e);
+
+    /**
+     * Called when the ResourceManager responds to a heartbeat with containers
+     * from previous attempt.
+     */
+    public void onContainersReceivedFromPreviousAttempts(
+        List<Container> containers) {
+    }
   }
 
   /**
@@ -531,5 +539,7 @@ extends AbstractService {
      * @param e
      */
     void onError(Throwable e);
+
+    void onContainersReceivedFromPreviousAttempts(List<Container> containers);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 031cdec..cafb153 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -358,6 +358,11 @@ extends AMRMClientAsync<T> {
           if (!allocated.isEmpty()) {
             handler.onContainersAllocated(allocated);
           }
+
+          if (!response.getContainersFromPreviousAttempts().isEmpty()) {
+            handler.onContainersReceivedFromPreviousAttempts(
+                response.getContainersFromPreviousAttempts());
+          }
           progress = handler.getProgress();
         } catch (Throwable ex) {
           handler.onError(ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
index 5eaa9c0..b6e7a20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
@@ -38,4 +38,5 @@ public final class YarnRegistryAttributes {
   public static final String YARN_PATH = "yarn:path";
   public static final String YARN_HOSTNAME = "yarn:hostname";
   public static final String YARN_IP = "yarn:ip";
+  public static final String YARN_COMPONENT = "yarn:component";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3ebe6a78/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md
index 561df7f..a6fd998 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/Configurations.md
@@ -125,6 +125,7 @@ Above config make the service AM to be retried at max 10 times.
 |yarn.service.log.exclude-pattern| The regex expression for excluding log files whose file name matches it when aggregating the logs after the application completes. If the log file name matches both include and exclude pattern, this file will be excluded.
 |yarn.service.rolling-log.include-pattern| The regex expression for including log files whose file name matches it when aggregating the logs while app is running.
 |yarn.service.rolling-log.exclude-pattern| The regex expression for excluding log files whose file name matches it when aggregating the logs while app is running. If the log file name matches both include and exclude pattern, this file will be excluded.
+|yarn.service.container-recovery.timeout.ms| The timeout in milliseconds after which the service AM releases all the containers of previous attempt which are not yet recovered by the RM. By default, it is set to 120000, i.e. 2 minutes.
 
 ## Constant variables for custom service
 The service framework provides some constant variables for user to configure their services. These variables are either dynamically generated by the system or are static ones such as service name defined by the user.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org