You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/10 23:32:16 UTC

[16/25] hadoop git commit: YARN-4946. RM should not consider an application as COMPLETED when log aggregation is not in a terminal state (snemeth via rkanter)

YARN-4946. RM should not consider an application as COMPLETED when log aggregation is not in a terminal state (snemeth via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2517dd6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2517dd6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2517dd6

Branch: refs/heads/HDFS-12943
Commit: b2517dd66b3c88fdd478411cf208921bd3023755
Parents: 8244abb
Author: Robert Kanter <rk...@apache.org>
Authored: Thu Aug 9 14:58:04 2018 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Thu Aug 9 14:58:04 2018 -0700

----------------------------------------------------------------------
 .../server/resourcemanager/RMAppManager.java    |  81 +++++--
 .../server/resourcemanager/rmapp/RMApp.java     |   6 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |   8 +-
 .../server/resourcemanager/TestAppManager.java  | 241 +++++++++++++++----
 .../applicationsmanager/MockAsm.java            |  11 +
 .../server/resourcemanager/rmapp/MockRMApp.java |  20 ++
 6 files changed, 294 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 7011aaa..ee78c08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -86,7 +86,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
   private int maxCompletedAppsInMemory;
   private int maxCompletedAppsInStateStore;
   protected int completedAppsInStateStore = 0;
-  private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
+  protected LinkedList<ApplicationId> completedApps = new LinkedList<>();
 
   private final RMContext rmContext;
   private final ApplicationMasterService masterService;
@@ -284,31 +284,72 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
    * check to see if hit the limit for max # completed apps kept
    */
   protected synchronized void checkAppNumCompletedLimit() {
-    // check apps kept in state store.
-    while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
-      ApplicationId removeId =
-          completedApps.get(completedApps.size() - completedAppsInStateStore);
+    if (completedAppsInStateStore > maxCompletedAppsInStateStore) {
+      removeCompletedAppsFromStateStore();
+    }
+
+    if (completedApps.size() > maxCompletedAppsInMemory) {
+      removeCompletedAppsFromMemory();
+    }
+  }
+
+  private void removeCompletedAppsFromStateStore() {
+    int numDelete = completedAppsInStateStore - maxCompletedAppsInStateStore;
+    for (int i = 0; i < numDelete; i++) {
+      ApplicationId removeId = completedApps.get(i);
       RMApp removeApp = rmContext.getRMApps().get(removeId);
-      LOG.info("Max number of completed apps kept in state store met:"
-          + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
-          + ", removing app " + removeApp.getApplicationId()
-          + " from state store.");
-      rmContext.getStateStore().removeApplication(removeApp);
-      completedAppsInStateStore--;
+      boolean deleteApp = shouldDeleteApp(removeApp);
+
+      if (deleteApp) {
+        LOG.info("Max number of completed apps kept in state store met:"
+            + " maxCompletedAppsInStateStore = "
+            + maxCompletedAppsInStateStore + ", removing app " + removeId
+            + " from state store.");
+        rmContext.getStateStore().removeApplication(removeApp);
+        completedAppsInStateStore--;
+      } else {
+        LOG.info("Max number of completed apps kept in state store met:"
+            + " maxCompletedAppsInStateStore = "
+            + maxCompletedAppsInStateStore + ", but not removing app "
+            + removeId
+            + " from state store as log aggregation have not finished yet.");
+      }
     }
+  }
 
-    // check apps kept in memorty.
-    while (completedApps.size() > this.maxCompletedAppsInMemory) {
-      ApplicationId removeId = completedApps.remove();
-      LOG.info("Application should be expired, max number of completed apps"
-          + " kept in memory met: maxCompletedAppsInMemory = "
-          + this.maxCompletedAppsInMemory + ", removing app " + removeId
-          + " from memory: ");
-      rmContext.getRMApps().remove(removeId);
-      this.applicationACLsManager.removeApplication(removeId);
+  private void removeCompletedAppsFromMemory() {
+    int numDelete = completedApps.size() - maxCompletedAppsInMemory;
+    int offset = 0;
+    for (int i = 0; i < numDelete; i++) {
+      int deletionIdx = i - offset;
+      ApplicationId removeId = completedApps.get(deletionIdx);
+      RMApp removeApp = rmContext.getRMApps().get(removeId);
+      boolean deleteApp = shouldDeleteApp(removeApp);
+
+      if (deleteApp) {
+        ++offset;
+        LOG.info("Application should be expired, max number of completed apps"
+                + " kept in memory met: maxCompletedAppsInMemory = "
+                + this.maxCompletedAppsInMemory + ", removing app " + removeId
+                + " from memory: ");
+        completedApps.remove(deletionIdx);
+        rmContext.getRMApps().remove(removeId);
+        this.applicationACLsManager.removeApplication(removeId);
+      } else {
+        LOG.info("Application should be expired, max number of completed apps"
+                + " kept in memory met: maxCompletedAppsInMemory = "
+                + this.maxCompletedAppsInMemory + ", but not removing app "
+                + removeId
+                + " from memory as log aggregation have not finished yet.");
+      }
     }
   }
 
+  private boolean shouldDeleteApp(RMApp app) {
+    return !app.isLogAggregationEnabled()
+            || app.isLogAggregationFinished();
+  }
+
   @SuppressWarnings("unchecked")
   protected void submitApplication(
       ApplicationSubmissionContext submissionContext, long submitTime,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 99cce87..535888c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -242,7 +242,11 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return the number of max attempts of the application.
    */
   int getMaxAppAttempts();
-  
+
+  boolean isLogAggregationEnabled();
+
+  boolean isLogAggregationFinished();
+
   /**
    * Returns the application type
    * @return the application type.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 9f1ea44..42e2bcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1912,7 +1912,13 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
-  private boolean isLogAggregationFinished() {
+  @Override
+  public boolean isLogAggregationEnabled() {
+    return logAggregationEnabled;
+  }
+
+  @Override
+  public boolean isLogAggregationFinished() {
     return this.logAggregationStatusForAppReport
       .equals(LogAggregationStatus.SUCCEEDED)
         || this.logAggregationStatusForAppReport

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 6a6f9cf..27e87bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,28 +19,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Matchers.matches;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -88,28 +69,48 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .ManagedParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Testing applications being retired from RM.
@@ -131,7 +132,7 @@ public class TestAppManager{
   } 
 
 
-  public static List<RMApp> newRMApps(int n, long time, RMAppState state) {
+  private static List<RMApp> newRMApps(int n, long time, RMAppState state) {
     List<RMApp> list = Lists.newArrayList();
     for (int i = 0; i < n; ++i) {
       list.add(new MockRMApp(i, time, state));
@@ -139,23 +140,63 @@ public class TestAppManager{
     return list;
   }
 
+  private static List<RMApp> newRMAppsMixedLogAggregationStatus(int n,
+      long time, RMAppState state) {
+    List<RMApp> list = Lists.newArrayList();
+    for (int i = 0; i < n; ++i) {
+      MockRMApp rmApp = new MockRMApp(i, time, state);
+      rmApp.setLogAggregationEnabled(true);
+      rmApp.setLogAggregationFinished(i % 2 == 0);
+      list.add(rmApp);
+    }
+    return list;
+  }
+
   public RMContext mockRMContext(int n, long time) {
+    final ConcurrentMap<ApplicationId, RMApp> map = createRMAppsMap(n, time);
+    return createMockRMContextInternal(map);
+  }
+
+  public RMContext mockRMContextWithMixedLogAggregationStatus(int n,
+      long time) {
+    final ConcurrentMap<ApplicationId, RMApp> map =
+        createRMAppsMapMixedLogAggStatus(n, time);
+    return createMockRMContextInternal(map);
+  }
+
+  private ConcurrentMap<ApplicationId, RMApp> createRMAppsMap(int n,
+      long time) {
     final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
     final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
     for (RMApp app : apps) {
       map.put(app.getApplicationId(), app);
     }
+    return map;
+  }
+
+  private ConcurrentMap<ApplicationId, RMApp> createRMAppsMapMixedLogAggStatus(
+      int n, long time) {
+    final List<RMApp> apps =
+        newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED);
+    final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
+    for (RMApp app : apps) {
+      map.put(app.getApplicationId(), app);
+    }
+    return map;
+  }
+
+  private RMContext createMockRMContextInternal(ConcurrentMap<ApplicationId, RMApp> map) {
     Dispatcher rmDispatcher = new AsyncDispatcher();
     ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
-        rmDispatcher);
+            rmDispatcher);
     AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
-        rmDispatcher);
+            rmDispatcher);
     AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
-        rmDispatcher);
+            rmDispatcher);
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     RMContext context = new RMContextImpl(rmDispatcher,
-        containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
-        null, null, null, null, null) {
+            containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+            null, null, null, null, null) {
       @Override
       public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
         return map;
@@ -198,9 +239,11 @@ public class TestAppManager{
 
   // Extend and make the functions we want to test public
   public class TestRMAppManager extends RMAppManager {
+    private final RMStateStore stateStore;
 
     public TestRMAppManager(RMContext context, Configuration conf) {
       super(context, null, null, new ApplicationACLsManager(conf), conf);
+      this.stateStore = context.getStateStore();
     }
 
     public TestRMAppManager(RMContext context,
@@ -208,6 +251,7 @@ public class TestAppManager{
         YarnScheduler scheduler, ApplicationMasterService masterService,
         ApplicationACLsManager applicationACLsManager, Configuration conf) {
       super(context, scheduler, masterService, applicationACLsManager, conf);
+      this.stateStore = context.getStateStore();
     }
 
     public void checkAppNumCompletedLimit() {
@@ -222,10 +266,32 @@ public class TestAppManager{
       return super.getCompletedAppsListSize();
     }
 
-    public int getCompletedAppsInStateStore() {
+    public int getNumberOfCompletedAppsInStateStore() {
       return this.completedAppsInStateStore;
     }
 
+    List<ApplicationId> getCompletedApps() {
+      return completedApps;
+    }
+
+    Set<ApplicationId> getFirstNCompletedApps(int n) {
+      return getCompletedApps().stream().limit(n).collect(toSet());
+    }
+
+    Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) {
+      return getCompletedApps().stream().limit(n)
+          .filter(app -> app.getId() % 2 == 0).collect(toSet());
+    }
+
+    Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) {
+      ArgumentCaptor<RMApp> argumentCaptor =
+          ArgumentCaptor.forClass(RMApp.class);
+      verify(stateStore, times(numRemoves))
+          .removeApplication(argumentCaptor.capture());
+      return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId)
+          .collect(toSet());
+    }
+
     public void submitApplication(
         ApplicationSubmissionContext submissionContext, String user)
             throws YarnException, IOException {
@@ -234,10 +300,14 @@ public class TestAppManager{
     }
   }
 
-  protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
-    for (RMApp app : rmContext.getRMApps().values()) {
+  private void addToCompletedApps(TestRMAppManager appMonitor,
+          RMContext rmContext) {
+    // ensure applications are finished in order by their IDs
+    List<RMApp> sortedApps = new ArrayList<>(rmContext.getRMApps().values());
+    sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId()));
+    for (RMApp app : sortedApps) {
       if (app.getState() == RMAppState.FINISHED
-          || app.getState() == RMAppState.KILLED 
+          || app.getState() == RMAppState.KILLED
           || app.getState() == RMAppState.FAILED) {
         appMonitor.finishApplication(app.getApplicationId());
       }
@@ -631,7 +701,8 @@ public class TestAppManager{
   @Test
   public void testStateStoreAppLimitLessThanMemoryAppLimit() {
     long now = System.currentTimeMillis();
-    RMContext rmContext = mockRMContext(10, now - 20000);
+    final int allApps = 10;
+    RMContext rmContext = mockRMContext(allApps, now - 20000);
     Configuration conf = new YarnConfiguration();
     int maxAppsInMemory = 8;
     int maxAppsInStateStore = 4;
@@ -641,39 +712,57 @@ public class TestAppManager{
     TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
 
     addToCompletedApps(appMonitor, rmContext);
-    Assert.assertEquals("Number of completed apps incorrect", 10,
+    Assert.assertEquals("Number of completed apps incorrect", allApps,
         appMonitor.getCompletedAppsListSize());
+
+    int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore;
+    Set<ApplicationId> appsShouldBeRemovedFromStateStore = appMonitor
+            .getFirstNCompletedApps(numRemoveAppsFromStateStore);
     appMonitor.checkAppNumCompletedLimit();
 
+    Set<ApplicationId> removedAppsFromStateStore = appMonitor
+            .getRemovedAppsFromStateStore(numRemoveAppsFromStateStore);
+
     Assert.assertEquals("Number of apps incorrect after # completed check",
       maxAppsInMemory, rmContext.getRMApps().size());
     Assert.assertEquals("Number of completed apps incorrect after check",
       maxAppsInMemory, appMonitor.getCompletedAppsListSize());
 
-    int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
     verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
       .removeApplication(isA(RMApp.class));
     Assert.assertEquals(maxAppsInStateStore,
-      appMonitor.getCompletedAppsInStateStore());
+      appMonitor.getNumberOfCompletedAppsInStateStore());
+
+    List<ApplicationId> completedApps = appMonitor.getCompletedApps();
+    Assert.assertEquals(maxAppsInMemory, completedApps.size());
+    Assert.assertEquals(numRemoveAppsFromStateStore,
+        removedAppsFromStateStore.size());
+    Assert.assertEquals(numRemoveAppsFromStateStore,
+        Sets.intersection(appsShouldBeRemovedFromStateStore,
+            removedAppsFromStateStore).size());
   }
 
   @Test
-  public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
+  public void testStateStoreAppLimitGreaterThanMemoryAppLimit() {
     long now = System.currentTimeMillis();
-    RMContext rmContext = mockRMContext(10, now - 20000);
+    final int allApps = 10;
+    RMContext rmContext = mockRMContext(allApps, now - 20000);
     Configuration conf = new YarnConfiguration();
     int maxAppsInMemory = 8;
     conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
-    // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
+    // greater than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
     conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
     TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
 
     addToCompletedApps(appMonitor, rmContext);
-    Assert.assertEquals("Number of completed apps incorrect", 10,
+    Assert.assertEquals("Number of completed apps incorrect", allApps,
         appMonitor.getCompletedAppsListSize());
+
+    int numRemoveApps = allApps - maxAppsInMemory;
+    Set<ApplicationId> appsShouldBeRemoved = appMonitor
+            .getFirstNCompletedApps(numRemoveApps);
     appMonitor.checkAppNumCompletedLimit();
 
-    int numRemoveApps = 10 - maxAppsInMemory;
     Assert.assertEquals("Number of apps incorrect after # completed check",
       maxAppsInMemory, rmContext.getRMApps().size());
     Assert.assertEquals("Number of completed apps incorrect after check",
@@ -681,7 +770,57 @@ public class TestAppManager{
     verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
       isA(RMApp.class));
     Assert.assertEquals(maxAppsInMemory,
-      appMonitor.getCompletedAppsInStateStore());
+      appMonitor.getNumberOfCompletedAppsInStateStore());
+
+    List<ApplicationId> completedApps = appMonitor.getCompletedApps();
+    Assert.assertEquals(maxAppsInMemory, completedApps.size());
+    Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size());
+    assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
+  }
+
+  @Test
+  public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() {
+    long now = System.currentTimeMillis();
+    final int allApps = 10;
+    RMContext rmContext =
+        mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000);
+    Configuration conf = new YarnConfiguration();
+    int maxAppsInMemory = 2;
+    conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
+        maxAppsInMemory);
+    // greater than maxCompletedAppsInMemory, reset to
+    // RM_MAX_COMPLETED_APPLICATIONS.
+    conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+        1000);
+    TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+    addToCompletedApps(appMonitor, rmContext);
+    Assert.assertEquals("Number of completed apps incorrect", allApps,
+            appMonitor.getCompletedAppsListSize());
+
+    int numRemoveApps = allApps - maxAppsInMemory;
+    int effectiveNumRemoveApps = numRemoveApps / 2;
+    //only apps with even ID would be deleted due to log aggregation status
+    int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps;
+
+    Set<ApplicationId> appsShouldBeRemoved = appMonitor
+            .getCompletedAppsWithEvenIdsInRange(numRemoveApps);
+    appMonitor.checkAppNumCompletedLimit();
+
+    Assert.assertEquals("Number of apps incorrect after # completed check",
+        expectedNumberOfAppsInMemory, rmContext.getRMApps().size());
+    Assert.assertEquals("Number of completed apps incorrect after check",
+        expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize());
+    verify(rmContext.getStateStore(), times(effectiveNumRemoveApps))
+        .removeApplication(isA(RMApp.class));
+    Assert.assertEquals(expectedNumberOfAppsInMemory,
+        appMonitor.getNumberOfCompletedAppsInStateStore());
+
+    List<ApplicationId> completedApps = appMonitor.getCompletedApps();
+
+    Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size());
+    Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size());
+    assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
   }
 
   protected void setupDispatcher(RMContext rmContext, Configuration conf) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 6c6c4b4..342dab8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -146,6 +146,17 @@ public abstract class MockAsm extends MockApps {
     public int getMaxAppAttempts() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public boolean isLogAggregationEnabled() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public boolean isLogAggregationFinished() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
     @Override
     public ApplicationReport createAndGetApplicationReport(
         String clientUserName,boolean allowAccess) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ad29d27..32ece34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -70,6 +70,8 @@ public class MockRMApp implements RMApp {
   int maxAppAttempts = 1;
   List<ResourceRequest> amReqs;
   private Set<String> applicationTags = null;
+  private boolean logAggregationEnabled;
+  private boolean logAggregationFinished;
 
   public MockRMApp(int newid, long time, RMAppState newState) {
     finish = time;
@@ -236,6 +238,24 @@ public class MockRMApp implements RMApp {
     return maxAppAttempts;
   }
 
+  @Override
+  public boolean isLogAggregationEnabled() {
+    return logAggregationEnabled;
+  }
+
+  @Override
+  public boolean isLogAggregationFinished() {
+    return logAggregationFinished;
+  }
+
+  public void setLogAggregationEnabled(boolean enabled) {
+    this.logAggregationEnabled = enabled;
+  }
+
+  public void setLogAggregationFinished(boolean finished) {
+    this.logAggregationFinished = finished;
+  }
+
   public void setNumMaxRetries(int maxAppAttempts) {
     this.maxAppAttempts = maxAppAttempts;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org