You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:12 UTC
[11/39] hadoop git commit: YARN-4987. Read cache concurrency issue
between read and evict in EntityGroupFS timeline store. Contributed by Li Lu.
YARN-4987. Read cache concurrency issue between read and evict in EntityGroupFS timeline store. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/705286cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/705286cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/705286cc
Branch: refs/heads/HDFS-1312
Commit: 705286ccaeea36941d97ec1c1700746b74264924
Parents: bde819a
Author: Junping Du <ju...@apache.org>
Authored: Fri May 27 06:58:32 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri May 27 06:58:32 2016 -0700
----------------------------------------------------------------------
.../yarn/server/timeline/EntityCacheItem.java | 66 ++++-
.../timeline/EntityGroupFSTimelineStore.java | 82 ++++--
.../timeline/EntityGroupPlugInForTest.java | 19 +-
.../TestEntityGroupFSTimelineStore.java | 248 +++++++++++++++----
4 files changed, 325 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
index dd2a27d..1566ae2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -16,6 +16,8 @@
*/
package org.apache.hadoop.yarn.server.timeline;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Cache item for timeline server v1.5 reader cache. Each cache item has a
@@ -40,12 +43,17 @@ public class EntityCacheItem {
= LoggerFactory.getLogger(EntityCacheItem.class);
private TimelineStore store;
+ private TimelineEntityGroupId groupId;
private EntityGroupFSTimelineStore.AppLogs appLogs;
private long lastRefresh;
private Configuration config;
private FileSystem fs;
+ private int refCount = 0;
+ private static AtomicInteger activeStores = new AtomicInteger(0);
- public EntityCacheItem(Configuration config, FileSystem fs) {
+ public EntityCacheItem(TimelineEntityGroupId gId, Configuration config,
+ FileSystem fs) {
+ this.groupId = gId;
this.config = config;
this.fs = fs;
}
@@ -70,17 +78,24 @@ public class EntityCacheItem {
/**
* @return The timeline store, either loaded or unloaded, of this cache item.
+ * This method will not hold the storage from being reclaimed.
*/
public synchronized TimelineStore getStore() {
return store;
}
/**
+ * @return The number of currently active stores in all CacheItems.
+ */
+ public static int getActiveStores() {
+ return activeStores.get();
+ }
+
+ /**
* Refresh this cache item if it needs refresh. This will enforce an appLogs
* rescan and then load new data. The refresh process is synchronized with
* other operations on the same cache item.
*
- * @param groupId Group id of the cache
* @param aclManager ACL manager for the timeline storage
* @param jsonFactory JSON factory for the storage
* @param objMapper Object mapper for the storage
@@ -89,10 +104,9 @@ public class EntityCacheItem {
* object filled with all entities in the group.
* @throws IOException
*/
- public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
- TimelineACLsManager aclManager, JsonFactory jsonFactory,
- ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
- throws IOException {
+ public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
+ JsonFactory jsonFactory, ObjectMapper objMapper,
+ EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
if (needRefresh()) {
long startTime = Time.monotonicNow();
// If an application is not finished, we only update summary logs (and put
@@ -105,6 +119,7 @@ public class EntityCacheItem {
}
if (!appLogs.getDetailLogs().isEmpty()) {
if (store == null) {
+ activeStores.getAndIncrement();
store = new LevelDBCacheTimelineStore(groupId.toString(),
"LeveldbCache." + groupId);
store.init(config);
@@ -148,11 +163,35 @@ public class EntityCacheItem {
}
/**
- * Release the cache item for the given group id.
+ * Increase the number of references to this cache item by 1.
+ */
+ public synchronized void incrRefs() {
+ refCount++;
+ }
+
+ /**
+ * Unregister a reader. Try to release the cache if the reader to current
+ * cache reaches 0.
*
- * @param groupId the group id that the cache should release
+ * @return true if the cache has been released, otherwise false
*/
- public synchronized void releaseCache(TimelineEntityGroupId groupId) {
+ public synchronized boolean tryRelease() {
+ refCount--;
+ // Only reclaim the storage if there is no reader.
+ if (refCount > 0) {
+ LOG.debug("{} references left for cached group {}, skipping the release",
+ refCount, groupId);
+ return false;
+ }
+ forceRelease();
+ return true;
+ }
+
+ /**
+ * Force releasing the cache item for the given group id, even though there
+ * may be active references.
+ */
+ public synchronized void forceRelease() {
try {
if (store != null) {
store.close();
@@ -161,12 +200,21 @@ public class EntityCacheItem {
LOG.warn("Error closing timeline store", e);
}
store = null;
+ activeStores.getAndDecrement();
+ refCount = 0;
// reset offsets so next time logs are re-parsed
for (LogInfo log : appLogs.getDetailLogs()) {
if (log.getFilename().contains(groupId.toString())) {
log.setOffset(0);
}
}
+ LOG.debug("Cache for group {} released. ", groupId);
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ synchronized int getRefCount() {
+ return refCount;
}
private boolean needRefresh() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index edd430c..231ca72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -108,6 +108,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
+ "%04d" + Path.SEPARATOR // app num / 1,000,000
+ "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
+ "%s" + Path.SEPARATOR; // full app id
+ // Indicates when to force release a cache item even if there are active
+ // readers. Enlarge this factor may increase memory usage for the reader since
+ // there may be more cache items "hanging" in memory but not in cache.
+ private static final int CACHE_ITEM_OVERFLOW_FACTOR = 2;
private YarnClient yarnClient;
private TimelineStore summaryStore;
@@ -172,7 +176,15 @@ public class EntityGroupFSTimelineStore extends CompositeService
TimelineEntityGroupId groupId = eldest.getKey();
LOG.debug("Evicting {} due to space limitations", groupId);
EntityCacheItem cacheItem = eldest.getValue();
- cacheItem.releaseCache(groupId);
+ int activeStores = EntityCacheItem.getActiveStores();
+ if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) {
+ LOG.debug("Force release cache {} since {} stores are active",
+ groupId, activeStores);
+ cacheItem.forceRelease();
+ } else {
+ LOG.debug("Try release cache {}", groupId);
+ cacheItem.tryRelease();
+ }
if (cacheItem.getAppLogs().isDone()) {
appIdLogMap.remove(groupId.getApplicationId());
}
@@ -826,17 +838,19 @@ public class EntityGroupFSTimelineStore extends CompositeService
@InterfaceAudience.Private
@VisibleForTesting
void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
+ cacheItem.incrRefs();
cachedLogs.put(groupId, cacheItem);
}
private List<TimelineStore> getTimelineStoresFromCacheIds(
- Set<TimelineEntityGroupId> groupIds, String entityType)
+ Set<TimelineEntityGroupId> groupIds, String entityType,
+ List<EntityCacheItem> cacheItems)
throws IOException {
List<TimelineStore> stores = new LinkedList<TimelineStore>();
// For now we just handle one store in a context. We return the first
// non-null storage for the group ids.
for (TimelineEntityGroupId groupId : groupIds) {
- TimelineStore storeForId = getCachedStore(groupId);
+ TimelineStore storeForId = getCachedStore(groupId, cacheItems);
if (storeForId != null) {
LOG.debug("Adding {} as a store for the query", storeForId.getName());
stores.add(storeForId);
@@ -851,8 +865,9 @@ public class EntityGroupFSTimelineStore extends CompositeService
return stores;
}
- private List<TimelineStore> getTimelineStoresForRead(String entityId,
- String entityType) throws IOException {
+ protected List<TimelineStore> getTimelineStoresForRead(String entityId,
+ String entityType, List<EntityCacheItem> cacheItems)
+ throws IOException {
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
LOG.debug("Trying plugin {} for id {} and type {}",
@@ -871,12 +886,12 @@ public class EntityGroupFSTimelineStore extends CompositeService
cacheIdPlugin.getClass().getName());
}
}
- return getTimelineStoresFromCacheIds(groupIds, entityType);
+ return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
}
private List<TimelineStore> getTimelineStoresForRead(String entityType,
- NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
- throws IOException {
+ NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+ List<EntityCacheItem> cacheItems) throws IOException {
Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
Set<TimelineEntityGroupId> idsFromPlugin =
@@ -888,24 +903,26 @@ public class EntityGroupFSTimelineStore extends CompositeService
groupIds.addAll(idsFromPlugin);
}
}
- return getTimelineStoresFromCacheIds(groupIds, entityType);
+ return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
}
// find a cached timeline store or null if it cannot be located
- private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
- throws IOException {
+ private TimelineStore getCachedStore(TimelineEntityGroupId groupId,
+ List<EntityCacheItem> cacheItems) throws IOException {
EntityCacheItem cacheItem;
synchronized (this.cachedLogs) {
// Note that the content in the cache log storage may be stale.
cacheItem = this.cachedLogs.get(groupId);
if (cacheItem == null) {
LOG.debug("Set up new cache item for id {}", groupId);
- cacheItem = new EntityCacheItem(getConfig(), fs);
+ cacheItem = new EntityCacheItem(groupId, getConfig(), fs);
AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
if (appLogs != null) {
LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
cacheItem.setAppLogs(appLogs);
this.cachedLogs.put(groupId, cacheItem);
+ // Add the reference by the cache
+ cacheItem.incrRefs();
} else {
LOG.warn("AppLogs for groupId {} is set to null!", groupId);
}
@@ -915,30 +932,43 @@ public class EntityGroupFSTimelineStore extends CompositeService
if (cacheItem.getAppLogs() != null) {
AppLogs appLogs = cacheItem.getAppLogs();
LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
- store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
- objMapper, metrics);
+ // Add the reference by the store
+ cacheItem.incrRefs();
+ cacheItems.add(cacheItem);
+ store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper,
+ metrics);
} else {
LOG.warn("AppLogs for group id {} is null", groupId);
}
return store;
}
+ protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) {
+ for (EntityCacheItem item : relatedCacheItems) {
+ item.tryRelease();
+ }
+ }
+
@Override
public TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
+ List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
List<TimelineStore> stores = getTimelineStoresForRead(entityType,
- primaryFilter, secondaryFilters);
+ primaryFilter, secondaryFilters, relatedCacheItems);
TimelineEntities returnEntities = new TimelineEntities();
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {} for the request", store.getName());
- returnEntities.addEntities(
- store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
- fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
- checkAcl).getEntities());
+ TimelineEntities entities = store.getEntities(entityType, limit,
+ windowStart, windowEnd, fromId, fromTs, primaryFilter,
+ secondaryFilters, fieldsToRetrieve, checkAcl);
+ if (entities != null) {
+ returnEntities.addEntities(entities.getEntities());
+ }
}
+ tryReleaseCacheItems(relatedCacheItems);
return returnEntities;
}
@@ -946,17 +976,21 @@ public class EntityGroupFSTimelineStore extends CompositeService
public TimelineEntity getEntity(String entityId, String entityType,
EnumSet<Field> fieldsToRetrieve) throws IOException {
LOG.debug("getEntity type={} id={}", entityType, entityId);
- List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
+ List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
+ List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType,
+ relatedCacheItems);
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
store.toString());
TimelineEntity e =
store.getEntity(entityId, entityType, fieldsToRetrieve);
if (e != null) {
+ tryReleaseCacheItems(relatedCacheItems);
return e;
}
}
LOG.debug("getEntity: Found nothing");
+ tryReleaseCacheItems(relatedCacheItems);
return null;
}
@@ -966,10 +1000,11 @@ public class EntityGroupFSTimelineStore extends CompositeService
Long windowEnd, Set<String> eventTypes) throws IOException {
LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
TimelineEvents returnEvents = new TimelineEvents();
+ List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
for (String entityId : entityIds) {
LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
List<TimelineStore> stores
- = getTimelineStoresForRead(entityId, entityType);
+ = getTimelineStoresForRead(entityId, entityType, relatedCacheItems);
for (TimelineStore store : stores) {
LOG.debug("Try timeline store {}:{} for the request", store.getName(),
store.toString());
@@ -978,9 +1013,12 @@ public class EntityGroupFSTimelineStore extends CompositeService
TimelineEvents events =
store.getEntityTimelines(entityType, entityIdSet, limit,
windowStart, windowEnd, eventTypes);
- returnEvents.addEvents(events.getAllEvents());
+ if (events != null) {
+ returnEvents.addEvents(events.getAllEvents());
+ }
}
}
+ tryReleaseCacheItems(relatedCacheItems);
return returnEvents;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
index 71e26cb..db241a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.timeline;
import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import java.util.Collection;
import java.util.Set;
@@ -26,31 +28,32 @@ import java.util.SortedSet;
class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
- private static TimelineEntityGroupId timelineEntityGroupId
- = TimelineEntityGroupId.newInstance(
- TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
+ static final String APP_ID_FILTER_NAME = "appid";
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
NameValuePair primaryFilter,
Collection<NameValuePair> secondaryFilters) {
- return Sets.newHashSet(timelineEntityGroupId);
+ ApplicationId appId
+ = ConverterUtils.toApplicationId(primaryFilter.getValue().toString());
+ return Sets.newHashSet(getStandardTimelineGroupId(appId));
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
String entityType) {
- return Sets.newHashSet(timelineEntityGroupId);
+ ApplicationId appId = ConverterUtils.toApplicationId(entityId);
+ return Sets.newHashSet(getStandardTimelineGroupId(appId));
}
@Override
public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
SortedSet<String> entityIds,
Set<String> eventTypes) {
- return Sets.newHashSet(timelineEntityGroupId);
+ return Sets.newHashSet();
}
- static TimelineEntityGroupId getStandardTimelineGroupId() {
- return timelineEntityGroupId;
+ static TimelineEntityGroupId getStandardTimelineGroupId(ApplicationId appId) {
+ return TimelineEntityGroupId.newInstance(appId, "test");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 7a8ff2f..d6baab6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -44,8 +44,17 @@ import org.junit.Test;
import org.junit.rules.TestName;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import static org.junit.Assert.assertEquals;
@@ -53,24 +62,15 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
- private static final String SAMPLE_APP_NAME = "1234_5678";
+ private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000";
+ private static final int CACHE_TEST_CACHE_SIZE = 5;
- static final ApplicationId TEST_APPLICATION_ID
- = ConverterUtils.toApplicationId(
- ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
-
- private static final String TEST_APP_DIR_NAME
- = TEST_APPLICATION_ID.toString();
- private static final String TEST_ATTEMPT_DIR_NAME
- = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
private static final String TEST_SUMMARY_LOG_FILE_NAME
= EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
- private static final String TEST_ENTITY_LOG_FILE_NAME
- = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
- + EntityGroupPlugInForTest.getStandardTimelineGroupId();
private static final String TEST_DOMAIN_LOG_FILE_NAME
= EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
@@ -78,9 +78,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
= new Path(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestEntityGroupFSTimelineStore.class.getSimpleName());
- private static Path testAppDirPath;
- private static Path testAttemptDirPath;
- private static Path testDoneDirPath;
private static Configuration config = new YarnConfiguration();
private static MiniDFSCluster hdfsCluster;
@@ -88,7 +85,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static FileContext fc;
private static FileContextTestHelper fileContextTestHelper =
new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore");
- private EntityGroupFSTimelineStore store;
+
+ private static List<ApplicationId> sampleAppIds;
+ private static ApplicationId mainTestAppId;
+ private static Path mainTestAppDirPath;
+ private static Path testDoneDirPath;
+ private static String mainEntityLogFileName;
+
+ private EntityGroupFSTimelineStoreForTest store;
private TimelineEntity entityNew;
@Rule
@@ -101,23 +105,44 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
"YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
+ config.setInt(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
+ CACHE_TEST_CACHE_SIZE);
config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
hdfsCluster
= new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
fs = hdfsCluster.getFileSystem();
fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
- testAppDirPath = getTestRootPath(TEST_APPLICATION_ID.toString());
- testAttemptDirPath = new Path(testAppDirPath, TEST_ATTEMPT_DIR_NAME);
- testDoneDirPath = getTestRootPath("done");
- config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, testDoneDirPath.toString());
+ sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1);
+ for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) {
+ ApplicationId appId = ConverterUtils.toApplicationId(
+ ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST
+ + i);
+ sampleAppIds.add(appId);
+ }
+ // Among all sample applicationIds, choose the first one for most of the
+ // tests.
+ mainTestAppId = sampleAppIds.get(0);
+ mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
+ mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
+
+ testDoneDirPath = getTestRootPath("done");
+ config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+ testDoneDirPath.toString());
}
@Before
public void setup() throws Exception {
- createTestFiles();
- store = new EntityGroupFSTimelineStore();
+ for (ApplicationId appId : sampleAppIds) {
+ Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
+ getAttemptDirName(appId));
+ createTestFiles(appId, attemotDirPath);
+ }
+
+ store = new EntityGroupFSTimelineStoreForTest();
if (currTestName.getMethodName().contains("Plugin")) {
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
EntityGroupPlugInForTest.class.getName());
@@ -130,7 +155,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
@After
public void tearDown() throws Exception {
store.stop();
- fs.delete(testAppDirPath, true);
+ for (ApplicationId appId : sampleAppIds) {
+ fs.delete(getTestRootPath(appId.toString()), true);
+ }
}
@AfterClass
@@ -144,7 +171,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
@Test
public void testAppLogsScanLogs() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
- store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
@@ -160,14 +187,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
for (LogInfo log : detailLogs) {
String fileName = log.getFilename();
- assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
+ assertEquals(fileName, mainEntityLogFileName);
}
}
@Test
public void testMoveToDone() throws Exception {
EntityGroupFSTimelineStore.AppLogs appLogs =
- store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
Path pathBefore = appLogs.getAppDirPath();
appLogs.moveToDone();
@@ -182,7 +209,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
long beforeScan = scanned.value();
EntityGroupFSTimelineStore.AppLogs appLogs =
- store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
appLogs.scanForLogs();
appLogs.parseSummaryLogs(tdm);
@@ -194,6 +221,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
public void testCleanLogs() throws Exception {
// Create test dirs and files
// Irrelevant file, should not be reclaimed
+ String appDirName = mainTestAppId.toString();
+ String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix
+ + appDirName + "_1";
Path irrelevantFilePath = new Path(
testDoneDirPath, "irrelevant.log");
FSDataOutputStream stream = fs.create(irrelevantFilePath);
@@ -204,29 +234,29 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001");
// First application, untouched after creation
- Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
- Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
+ Path appDirClean = new Path(doneAppHomeDir, appDirName);
+ Path attemptDirClean = new Path(appDirClean, attemptDirName);
fs.mkdirs(attemptDirClean);
Path filePath = new Path(attemptDirClean, "test.log");
stream = fs.create(filePath);
stream.close();
// Second application, one file touched after creation
- Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
+ Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1");
Path attemptDirHoldByFile
- = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
+ = new Path(appDirHoldByFile, attemptDirName);
fs.mkdirs(attemptDirHoldByFile);
Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
stream = fs.create(filePathHold);
stream.close();
// Third application, one dir touched after creation
- Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
- Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
+ Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2");
+ Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName);
fs.mkdirs(attemptDirHoldByDir);
Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
fs.mkdirs(dirPathHold);
// Fourth application, empty dirs
- Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
- Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
+ Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3");
+ Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName);
fs.mkdirs(attemptDirEmpty);
Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
fs.mkdirs(dirPathEmpty);
@@ -274,12 +304,15 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
// Load data and cache item, prepare timeline store by making a cache item
EntityGroupFSTimelineStore.AppLogs appLogs =
- store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
- EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
+ EntityCacheItem cacheItem = new EntityCacheItem(
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+ config, fs);
cacheItem.setAppLogs(appLogs);
store.setCachedLogs(
- EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+ cacheItem);
MutableCounterLong detailLogEntityRead =
store.metrics.getGetEntityToDetailOps();
MutableStat cacheRefresh = store.metrics.getCacheRefresh();
@@ -291,16 +324,20 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
= PluginStoreTestUtils.getTdmWithStore(config, store);
// Verify single entity read
- TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
+ TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(),
EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+ assertEquals(1, cacheItem.getRefCount());
+ assertEquals(1, EntityCacheItem.getActiveStores());
// Verify multiple entities read
- TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
- null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+ NameValuePair primaryFilter = new NameValuePair(
+ EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
+ TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null,
+ null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
- assertEquals(entities.getEntities().size(), 1);
+ assertEquals(1, entities.getEntities().size());
for (TimelineEntity entity : entities.getEntities()) {
assertEquals(entityNew.getStartTime(), entity.getStartTime());
}
@@ -309,11 +346,79 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
}
+ @Test(timeout = 90000L)
+ public void testMultiplePluginRead() throws Exception {
+ Thread mainThread = Thread.currentThread();
+ mainThread.setName("testMain");
+ // Verify precondition
+ assertEquals(EntityGroupPlugInForTest.class.getName(),
+ store.getConfig().get(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
+ // Prepare timeline store by making cache items
+ EntityGroupFSTimelineStore.AppLogs appLogs =
+ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
+ AppState.COMPLETED);
+ final EntityCacheItem cacheItem = new EntityCacheItem(
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+ config, fs);
+
+ cacheItem.setAppLogs(appLogs);
+ store.setCachedLogs(
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+ cacheItem);
+
+ // Launch the blocking read call in a future
+ ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
+ FutureTask<TimelineEntity> blockingReader =
+ new FutureTask<>(new Callable<TimelineEntity>() {
+ public TimelineEntity call() throws Exception {
+ Thread currThread = Thread.currentThread();
+ currThread.setName("blockingReader");
+ return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
+ EnumSet.allOf(TimelineReader.Field.class));
+ }});
+ threadExecutor.execute(blockingReader);
+ try {
+ while (!store.testCacheReferenced) {
+ Thread.sleep(300);
+ }
+ } catch (InterruptedException e) {
+ fail("Interrupted on exception " + e);
+ }
+ // Try refill the cache after the first cache item is referenced
+ for (ApplicationId appId : sampleAppIds) {
+ // Skip the first appId since it's already in cache
+ if (appId.equals(mainTestAppId)) {
+ continue;
+ }
+ EntityGroupFSTimelineStore.AppLogs currAppLog =
+ store.new AppLogs(appId, getTestRootPath(appId.toString()),
+ AppState.COMPLETED);
+ EntityCacheItem item = new EntityCacheItem(
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
+ config, fs);
+ item.setAppLogs(currAppLog);
+ store.setCachedLogs(
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
+ item);
+ }
+ // At this time, the cache item of the blocking reader should be evicted.
+ assertEquals(1, cacheItem.getRefCount());
+ store.testCanProceed = true;
+ TimelineEntity entity3 = blockingReader.get();
+
+ assertNotNull(entity3);
+ assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+ assertEquals(0, cacheItem.getRefCount());
+
+ threadExecutor.shutdownNow();
+ }
+
@Test
public void testSummaryRead() throws Exception {
// Load data
EntityGroupFSTimelineStore.AppLogs appLogs =
- store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+ store.new AppLogs(mainTestAppId, mainTestAppDirPath,
AppState.COMPLETED);
MutableCounterLong summaryLogEntityRead
= store.metrics.getGetEntityToSummaryOps();
@@ -331,28 +436,32 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
UserGroupInformation.getLoginUser());
assertEquals(entities.getEntities().size(), 1);
for (TimelineEntity entity : entities.getEntities()) {
- assertEquals((Long) 123l, entity.getStartTime());
+ assertEquals((Long) 123L, entity.getStartTime());
}
// Verify metrics
assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
}
- private void createTestFiles() throws IOException {
+ private void createTestFiles(ApplicationId appId, Path attemptDirPath)
+ throws IOException {
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
PluginStoreTestUtils.writeEntities(entities,
- new Path(testAttemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
-
+ new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
+ Map<String, Set<Object>> primaryFilters = new HashMap<>();
+ Set<Object> appSet = new HashSet<Object>();
+ appSet.add(appId.toString());
+ primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet);
entityNew = PluginStoreTestUtils
- .createEntity("id_3", "type_3", 789l, null, null,
- null, null, "domain_id_1");
+ .createEntity(appId.toString(), "type_3", 789L, null, null,
+ primaryFilters, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
PluginStoreTestUtils.writeEntities(entityList,
- new Path(testAttemptDirPath, TEST_ENTITY_LOG_FILE_NAME), fs);
+ new Path(attemptDirPath, mainEntityLogFileName), fs);
FSDataOutputStream out = fs.create(
- new Path(testAttemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
+ new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
out.close();
}
@@ -360,4 +469,41 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
return fileContextTestHelper.getTestRootPath(fc, pathString);
}
+ private static String getAttemptDirName(ApplicationId appId) {
+ return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
+ }
+
+ private static class EntityGroupFSTimelineStoreForTest
+ extends EntityGroupFSTimelineStore {
+ // Flags used for the concurrent testing environment
+ private volatile boolean testCanProceed = false;
+ private volatile boolean testCacheReferenced = false;
+
+ TimelineEntity getEntityBlocking(String entityId, String entityType,
+ EnumSet<Field> fieldsToRetrieve) throws IOException {
+ List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
+ List<TimelineStore> stores = getTimelineStoresForRead(entityId,
+ entityType, relatedCacheItems);
+
+ testCacheReferenced = true;
+ try {
+ while (!testCanProceed) {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ fail("Interrupted " + e);
+ }
+
+ for (TimelineStore store : stores) {
+ TimelineEntity e =
+ store.getEntity(entityId, entityType, fieldsToRetrieve);
+ if (e != null) {
+ tryReleaseCacheItems(relatedCacheItems);
+ return e;
+ }
+ }
+ tryReleaseCacheItems(relatedCacheItems);
+ return null;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org