You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by eh...@apache.org on 2018/08/10 11:35:39 UTC
[45/50] [abbrv] hadoop git commit: YARN-4946. RM should not consider
an application as COMPLETED when log aggregation is not in a terminal state
(snemeth via rkanter)
YARN-4946. RM should not consider an application as COMPLETED when log aggregation is not in a terminal state (snemeth via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2517dd6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2517dd6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2517dd6
Branch: refs/heads/HDFS-12090
Commit: b2517dd66b3c88fdd478411cf208921bd3023755
Parents: 8244abb
Author: Robert Kanter <rk...@apache.org>
Authored: Thu Aug 9 14:58:04 2018 -0700
Committer: Robert Kanter <rk...@apache.org>
Committed: Thu Aug 9 14:58:04 2018 -0700
----------------------------------------------------------------------
.../server/resourcemanager/RMAppManager.java | 81 +++++--
.../server/resourcemanager/rmapp/RMApp.java | 6 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 8 +-
.../server/resourcemanager/TestAppManager.java | 241 +++++++++++++++----
.../applicationsmanager/MockAsm.java | 11 +
.../server/resourcemanager/rmapp/MockRMApp.java | 20 ++
6 files changed, 294 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 7011aaa..ee78c08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -86,7 +86,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private int maxCompletedAppsInMemory;
private int maxCompletedAppsInStateStore;
protected int completedAppsInStateStore = 0;
- private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
+ protected LinkedList<ApplicationId> completedApps = new LinkedList<>();
private final RMContext rmContext;
private final ApplicationMasterService masterService;
@@ -284,31 +284,72 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
- // check apps kept in state store.
- while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
- ApplicationId removeId =
- completedApps.get(completedApps.size() - completedAppsInStateStore);
+ if (completedAppsInStateStore > maxCompletedAppsInStateStore) {
+ removeCompletedAppsFromStateStore();
+ }
+
+ if (completedApps.size() > maxCompletedAppsInMemory) {
+ removeCompletedAppsFromMemory();
+ }
+ }
+
+ private void removeCompletedAppsFromStateStore() {
+ int numDelete = completedAppsInStateStore - maxCompletedAppsInStateStore;
+ for (int i = 0; i < numDelete; i++) {
+ ApplicationId removeId = completedApps.get(i);
RMApp removeApp = rmContext.getRMApps().get(removeId);
- LOG.info("Max number of completed apps kept in state store met:"
- + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
- + ", removing app " + removeApp.getApplicationId()
- + " from state store.");
- rmContext.getStateStore().removeApplication(removeApp);
- completedAppsInStateStore--;
+ boolean deleteApp = shouldDeleteApp(removeApp);
+
+ if (deleteApp) {
+ LOG.info("Max number of completed apps kept in state store met:"
+ + " maxCompletedAppsInStateStore = "
+ + maxCompletedAppsInStateStore + ", removing app " + removeId
+ + " from state store.");
+ rmContext.getStateStore().removeApplication(removeApp);
+ completedAppsInStateStore--;
+ } else {
+ LOG.info("Max number of completed apps kept in state store met:"
+ + " maxCompletedAppsInStateStore = "
+ + maxCompletedAppsInStateStore + ", but not removing app "
+ + removeId
+ + " from state store as log aggregation have not finished yet.");
+ }
}
+ }
- // check apps kept in memorty.
- while (completedApps.size() > this.maxCompletedAppsInMemory) {
- ApplicationId removeId = completedApps.remove();
- LOG.info("Application should be expired, max number of completed apps"
- + " kept in memory met: maxCompletedAppsInMemory = "
- + this.maxCompletedAppsInMemory + ", removing app " + removeId
- + " from memory: ");
- rmContext.getRMApps().remove(removeId);
- this.applicationACLsManager.removeApplication(removeId);
+ private void removeCompletedAppsFromMemory() {
+ int numDelete = completedApps.size() - maxCompletedAppsInMemory;
+ int offset = 0;
+ for (int i = 0; i < numDelete; i++) {
+ int deletionIdx = i - offset;
+ ApplicationId removeId = completedApps.get(deletionIdx);
+ RMApp removeApp = rmContext.getRMApps().get(removeId);
+ boolean deleteApp = shouldDeleteApp(removeApp);
+
+ if (deleteApp) {
+ ++offset;
+ LOG.info("Application should be expired, max number of completed apps"
+ + " kept in memory met: maxCompletedAppsInMemory = "
+ + this.maxCompletedAppsInMemory + ", removing app " + removeId
+ + " from memory: ");
+ completedApps.remove(deletionIdx);
+ rmContext.getRMApps().remove(removeId);
+ this.applicationACLsManager.removeApplication(removeId);
+ } else {
+ LOG.info("Application should be expired, max number of completed apps"
+ + " kept in memory met: maxCompletedAppsInMemory = "
+ + this.maxCompletedAppsInMemory + ", but not removing app "
+ + removeId
+ + " from memory as log aggregation have not finished yet.");
+ }
}
}
+ private boolean shouldDeleteApp(RMApp app) {
+ return !app.isLogAggregationEnabled()
+ || app.isLogAggregationFinished();
+ }
+
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 99cce87..535888c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -242,7 +242,11 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the number of max attempts of the application.
*/
int getMaxAppAttempts();
-
+
+ boolean isLogAggregationEnabled();
+
+ boolean isLogAggregationFinished();
+
/**
* Returns the application type
* @return the application type.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 9f1ea44..42e2bcf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1912,7 +1912,13 @@ public class RMAppImpl implements RMApp, Recoverable {
}
}
- private boolean isLogAggregationFinished() {
+ @Override
+ public boolean isLogAggregationEnabled() {
+ return logAggregationEnabled;
+ }
+
+ @Override
+ public boolean isLogAggregationFinished() {
return this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.SUCCEEDED)
|| this.logAggregationStatusForAppReport
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 6a6f9cf..27e87bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -19,28 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Matchers.matches;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -88,28 +69,48 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .ManagedParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Testing applications being retired from RM.
@@ -131,7 +132,7 @@ public class TestAppManager{
}
- public static List<RMApp> newRMApps(int n, long time, RMAppState state) {
+ private static List<RMApp> newRMApps(int n, long time, RMAppState state) {
List<RMApp> list = Lists.newArrayList();
for (int i = 0; i < n; ++i) {
list.add(new MockRMApp(i, time, state));
@@ -139,23 +140,63 @@ public class TestAppManager{
return list;
}
+ private static List<RMApp> newRMAppsMixedLogAggregationStatus(int n,
+ long time, RMAppState state) {
+ List<RMApp> list = Lists.newArrayList();
+ for (int i = 0; i < n; ++i) {
+ MockRMApp rmApp = new MockRMApp(i, time, state);
+ rmApp.setLogAggregationEnabled(true);
+ rmApp.setLogAggregationFinished(i % 2 == 0);
+ list.add(rmApp);
+ }
+ return list;
+ }
+
public RMContext mockRMContext(int n, long time) {
+ final ConcurrentMap<ApplicationId, RMApp> map = createRMAppsMap(n, time);
+ return createMockRMContextInternal(map);
+ }
+
+ public RMContext mockRMContextWithMixedLogAggregationStatus(int n,
+ long time) {
+ final ConcurrentMap<ApplicationId, RMApp> map =
+ createRMAppsMapMixedLogAggStatus(n, time);
+ return createMockRMContextInternal(map);
+ }
+
+ private ConcurrentMap<ApplicationId, RMApp> createRMAppsMap(int n,
+ long time) {
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
for (RMApp app : apps) {
map.put(app.getApplicationId(), app);
}
+ return map;
+ }
+
+ private ConcurrentMap<ApplicationId, RMApp> createRMAppsMapMixedLogAggStatus(
+ int n, long time) {
+ final List<RMApp> apps =
+ newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED);
+ final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
+ for (RMApp app : apps) {
+ map.put(app.getApplicationId(), app);
+ }
+ return map;
+ }
+
+ private RMContext createMockRMContextInternal(ConcurrentMap<ApplicationId, RMApp> map) {
Dispatcher rmDispatcher = new AsyncDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
- rmDispatcher);
+ rmDispatcher);
AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
- rmDispatcher);
+ rmDispatcher);
AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(
- rmDispatcher);
+ rmDispatcher);
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext context = new RMContextImpl(rmDispatcher,
- containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
- null, null, null, null, null) {
+ containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+ null, null, null, null, null) {
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map;
@@ -198,9 +239,11 @@ public class TestAppManager{
// Extend and make the functions we want to test public
public class TestRMAppManager extends RMAppManager {
+ private final RMStateStore stateStore;
public TestRMAppManager(RMContext context, Configuration conf) {
super(context, null, null, new ApplicationACLsManager(conf), conf);
+ this.stateStore = context.getStateStore();
}
public TestRMAppManager(RMContext context,
@@ -208,6 +251,7 @@ public class TestAppManager{
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
+ this.stateStore = context.getStateStore();
}
public void checkAppNumCompletedLimit() {
@@ -222,10 +266,32 @@ public class TestAppManager{
return super.getCompletedAppsListSize();
}
- public int getCompletedAppsInStateStore() {
+ public int getNumberOfCompletedAppsInStateStore() {
return this.completedAppsInStateStore;
}
+ List<ApplicationId> getCompletedApps() {
+ return completedApps;
+ }
+
+ Set<ApplicationId> getFirstNCompletedApps(int n) {
+ return getCompletedApps().stream().limit(n).collect(toSet());
+ }
+
+ Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) {
+ return getCompletedApps().stream().limit(n)
+ .filter(app -> app.getId() % 2 == 0).collect(toSet());
+ }
+
+ Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) {
+ ArgumentCaptor<RMApp> argumentCaptor =
+ ArgumentCaptor.forClass(RMApp.class);
+ verify(stateStore, times(numRemoves))
+ .removeApplication(argumentCaptor.capture());
+ return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId)
+ .collect(toSet());
+ }
+
public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
throws YarnException, IOException {
@@ -234,10 +300,14 @@ public class TestAppManager{
}
}
- protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
- for (RMApp app : rmContext.getRMApps().values()) {
+ private void addToCompletedApps(TestRMAppManager appMonitor,
+ RMContext rmContext) {
+ // ensure applications are finished in order by their IDs
+ List<RMApp> sortedApps = new ArrayList<>(rmContext.getRMApps().values());
+ sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId()));
+ for (RMApp app : sortedApps) {
if (app.getState() == RMAppState.FINISHED
- || app.getState() == RMAppState.KILLED
+ || app.getState() == RMAppState.KILLED
|| app.getState() == RMAppState.FAILED) {
appMonitor.finishApplication(app.getApplicationId());
}
@@ -631,7 +701,8 @@ public class TestAppManager{
@Test
public void testStateStoreAppLimitLessThanMemoryAppLimit() {
long now = System.currentTimeMillis();
- RMContext rmContext = mockRMContext(10, now - 20000);
+ final int allApps = 10;
+ RMContext rmContext = mockRMContext(allApps, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 8;
int maxAppsInStateStore = 4;
@@ -641,39 +712,57 @@ public class TestAppManager{
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
- Assert.assertEquals("Number of completed apps incorrect", 10,
+ Assert.assertEquals("Number of completed apps incorrect", allApps,
appMonitor.getCompletedAppsListSize());
+
+ int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore;
+ Set<ApplicationId> appsShouldBeRemovedFromStateStore = appMonitor
+ .getFirstNCompletedApps(numRemoveAppsFromStateStore);
appMonitor.checkAppNumCompletedLimit();
+ Set<ApplicationId> removedAppsFromStateStore = appMonitor
+ .getRemovedAppsFromStateStore(numRemoveAppsFromStateStore);
+
Assert.assertEquals("Number of apps incorrect after # completed check",
maxAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
- int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
.removeApplication(isA(RMApp.class));
Assert.assertEquals(maxAppsInStateStore,
- appMonitor.getCompletedAppsInStateStore());
+ appMonitor.getNumberOfCompletedAppsInStateStore());
+
+ List<ApplicationId> completedApps = appMonitor.getCompletedApps();
+ Assert.assertEquals(maxAppsInMemory, completedApps.size());
+ Assert.assertEquals(numRemoveAppsFromStateStore,
+ removedAppsFromStateStore.size());
+ Assert.assertEquals(numRemoveAppsFromStateStore,
+ Sets.intersection(appsShouldBeRemovedFromStateStore,
+ removedAppsFromStateStore).size());
}
@Test
- public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
+ public void testStateStoreAppLimitGreaterThanMemoryAppLimit() {
long now = System.currentTimeMillis();
- RMContext rmContext = mockRMContext(10, now - 20000);
+ final int allApps = 10;
+ RMContext rmContext = mockRMContext(allApps, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 8;
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
- // larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
+ // greater than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
- Assert.assertEquals("Number of completed apps incorrect", 10,
+ Assert.assertEquals("Number of completed apps incorrect", allApps,
appMonitor.getCompletedAppsListSize());
+
+ int numRemoveApps = allApps - maxAppsInMemory;
+ Set<ApplicationId> appsShouldBeRemoved = appMonitor
+ .getFirstNCompletedApps(numRemoveApps);
appMonitor.checkAppNumCompletedLimit();
- int numRemoveApps = 10 - maxAppsInMemory;
Assert.assertEquals("Number of apps incorrect after # completed check",
maxAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
@@ -681,7 +770,57 @@ public class TestAppManager{
verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
isA(RMApp.class));
Assert.assertEquals(maxAppsInMemory,
- appMonitor.getCompletedAppsInStateStore());
+ appMonitor.getNumberOfCompletedAppsInStateStore());
+
+ List<ApplicationId> completedApps = appMonitor.getCompletedApps();
+ Assert.assertEquals(maxAppsInMemory, completedApps.size());
+ Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size());
+ assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
+ }
+
+ @Test
+ public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() {
+ long now = System.currentTimeMillis();
+ final int allApps = 10;
+ RMContext rmContext =
+ mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000);
+ Configuration conf = new YarnConfiguration();
+ int maxAppsInMemory = 2;
+ conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
+ maxAppsInMemory);
+ // greater than maxCompletedAppsInMemory, reset to
+ // RM_MAX_COMPLETED_APPLICATIONS.
+ conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
+ 1000);
+ TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
+
+ addToCompletedApps(appMonitor, rmContext);
+ Assert.assertEquals("Number of completed apps incorrect", allApps,
+ appMonitor.getCompletedAppsListSize());
+
+ int numRemoveApps = allApps - maxAppsInMemory;
+ int effectiveNumRemoveApps = numRemoveApps / 2;
+ //only apps with even ID would be deleted due to log aggregation status
+ int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps;
+
+ Set<ApplicationId> appsShouldBeRemoved = appMonitor
+ .getCompletedAppsWithEvenIdsInRange(numRemoveApps);
+ appMonitor.checkAppNumCompletedLimit();
+
+ Assert.assertEquals("Number of apps incorrect after # completed check",
+ expectedNumberOfAppsInMemory, rmContext.getRMApps().size());
+ Assert.assertEquals("Number of completed apps incorrect after check",
+ expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize());
+ verify(rmContext.getStateStore(), times(effectiveNumRemoveApps))
+ .removeApplication(isA(RMApp.class));
+ Assert.assertEquals(expectedNumberOfAppsInMemory,
+ appMonitor.getNumberOfCompletedAppsInStateStore());
+
+ List<ApplicationId> completedApps = appMonitor.getCompletedApps();
+
+ Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size());
+ Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size());
+ assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
}
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 6c6c4b4..342dab8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -146,6 +146,17 @@ public abstract class MockAsm extends MockApps {
public int getMaxAppAttempts() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public boolean isLogAggregationEnabled() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public boolean isLogAggregationFinished() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName,boolean allowAccess) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2517dd6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ad29d27..32ece34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -70,6 +70,8 @@ public class MockRMApp implements RMApp {
int maxAppAttempts = 1;
List<ResourceRequest> amReqs;
private Set<String> applicationTags = null;
+ private boolean logAggregationEnabled;
+ private boolean logAggregationFinished;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@@ -236,6 +238,24 @@ public class MockRMApp implements RMApp {
return maxAppAttempts;
}
+ @Override
+ public boolean isLogAggregationEnabled() {
+ return logAggregationEnabled;
+ }
+
+ @Override
+ public boolean isLogAggregationFinished() {
+ return logAggregationFinished;
+ }
+
+ public void setLogAggregationEnabled(boolean enabled) {
+ this.logAggregationEnabled = enabled;
+ }
+
+ public void setLogAggregationFinished(boolean finished) {
+ this.logAggregationFinished = finished;
+ }
+
public void setNumMaxRetries(int maxAppAttempts) {
this.maxAppAttempts = maxAppAttempts;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org