You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/01 19:55:12 UTC

[11/39] hadoop git commit: YARN-4987. Read cache concurrency issue between read and evict in EntityGroupFS timeline store. Contributed by Li Lu.

YARN-4987. Read cache concurrency issue between read and evict in EntityGroupFS timeline store. Contributed by Li Lu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/705286cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/705286cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/705286cc

Branch: refs/heads/HDFS-1312
Commit: 705286ccaeea36941d97ec1c1700746b74264924
Parents: bde819a
Author: Junping Du <ju...@apache.org>
Authored: Fri May 27 06:58:32 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri May 27 06:58:32 2016 -0700

----------------------------------------------------------------------
 .../yarn/server/timeline/EntityCacheItem.java   |  66 ++++-
 .../timeline/EntityGroupFSTimelineStore.java    |  82 ++++--
 .../timeline/EntityGroupPlugInForTest.java      |  19 +-
 .../TestEntityGroupFSTimelineStore.java         | 248 +++++++++++++++----
 4 files changed, 325 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
index dd2a27d..1566ae2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.yarn.server.timeline;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Cache item for timeline server v1.5 reader cache. Each cache item has a
@@ -40,12 +43,17 @@ public class EntityCacheItem {
       = LoggerFactory.getLogger(EntityCacheItem.class);
 
   private TimelineStore store;
+  private TimelineEntityGroupId groupId;
   private EntityGroupFSTimelineStore.AppLogs appLogs;
   private long lastRefresh;
   private Configuration config;
   private FileSystem fs;
+  private int refCount = 0;
+  private static AtomicInteger activeStores = new AtomicInteger(0);
 
-  public EntityCacheItem(Configuration config, FileSystem fs) {
+  public EntityCacheItem(TimelineEntityGroupId gId, Configuration config,
+      FileSystem fs) {
+    this.groupId = gId;
     this.config = config;
     this.fs = fs;
   }
@@ -70,17 +78,24 @@ public class EntityCacheItem {
 
   /**
    * @return The timeline store, either loaded or unloaded, of this cache item.
+   * This method will not hold the storage from being reclaimed.
    */
   public synchronized TimelineStore getStore() {
     return store;
   }
 
   /**
+   * @return The number of currently active stores in all CacheItems.
+   */
+  public static int getActiveStores() {
+    return activeStores.get();
+  }
+
+  /**
    * Refresh this cache item if it needs refresh. This will enforce an appLogs
    * rescan and then load new data. The refresh process is synchronized with
    * other operations on the same cache item.
    *
-   * @param groupId Group id of the cache
    * @param aclManager ACL manager for the timeline storage
    * @param jsonFactory JSON factory for the storage
    * @param objMapper Object mapper for the storage
@@ -89,10 +104,9 @@ public class EntityCacheItem {
    *         object filled with all entities in the group.
    * @throws IOException
    */
-  public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
-      TimelineACLsManager aclManager, JsonFactory jsonFactory,
-      ObjectMapper objMapper, EntityGroupFSTimelineStoreMetrics metrics)
-      throws IOException {
+  public synchronized TimelineStore refreshCache(TimelineACLsManager aclManager,
+      JsonFactory jsonFactory, ObjectMapper objMapper,
+      EntityGroupFSTimelineStoreMetrics metrics) throws IOException {
     if (needRefresh()) {
       long startTime = Time.monotonicNow();
       // If an application is not finished, we only update summary logs (and put
@@ -105,6 +119,7 @@ public class EntityCacheItem {
       }
       if (!appLogs.getDetailLogs().isEmpty()) {
         if (store == null) {
+          activeStores.getAndIncrement();
           store = new LevelDBCacheTimelineStore(groupId.toString(),
               "LeveldbCache." + groupId);
           store.init(config);
@@ -148,11 +163,35 @@ public class EntityCacheItem {
   }
 
   /**
-   * Release the cache item for the given group id.
+   * Increase the number of references to this cache item by 1.
+   */
+  public synchronized void incrRefs() {
+    refCount++;
+  }
+
+  /**
+   * Unregister a reader. Try to release the cache if the reader to current
+   * cache reaches 0.
    *
-   * @param groupId the group id that the cache should release
+   * @return true if the cache has been released, otherwise false
    */
-  public synchronized void releaseCache(TimelineEntityGroupId groupId) {
+  public synchronized boolean tryRelease() {
+    refCount--;
+    // Only reclaim the storage if there is no reader.
+    if (refCount > 0) {
+      LOG.debug("{} references left for cached group {}, skipping the release",
+          refCount, groupId);
+      return false;
+    }
+    forceRelease();
+    return true;
+  }
+
+  /**
+   * Force releasing the cache item for the given group id, even though there
+   * may be active references.
+   */
+  public synchronized void forceRelease() {
     try {
       if (store != null) {
         store.close();
@@ -161,12 +200,21 @@ public class EntityCacheItem {
       LOG.warn("Error closing timeline store", e);
     }
     store = null;
+    activeStores.getAndDecrement();
+    refCount = 0;
     // reset offsets so next time logs are re-parsed
     for (LogInfo log : appLogs.getDetailLogs()) {
       if (log.getFilename().contains(groupId.toString())) {
         log.setOffset(0);
       }
     }
+    LOG.debug("Cache for group {} released. ", groupId);
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  synchronized int getRefCount() {
+    return refCount;
   }
 
   private boolean needRefresh() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index edd430c..231ca72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -108,6 +108,10 @@ public class EntityGroupFSTimelineStore extends CompositeService
           + "%04d" + Path.SEPARATOR // app num / 1,000,000
           + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
           + "%s" + Path.SEPARATOR; // full app id
+  // Indicates when to force release a cache item even if there are active
+  // readers. Enlarge this factor may increase memory usage for the reader since
+  // there may be more cache items "hanging" in memory but not in cache.
+  private static final int CACHE_ITEM_OVERFLOW_FACTOR = 2;
 
   private YarnClient yarnClient;
   private TimelineStore summaryStore;
@@ -172,7 +176,15 @@ public class EntityGroupFSTimelineStore extends CompositeService
               TimelineEntityGroupId groupId = eldest.getKey();
               LOG.debug("Evicting {} due to space limitations", groupId);
               EntityCacheItem cacheItem = eldest.getValue();
-              cacheItem.releaseCache(groupId);
+              int activeStores = EntityCacheItem.getActiveStores();
+              if (activeStores > appCacheMaxSize * CACHE_ITEM_OVERFLOW_FACTOR) {
+                LOG.debug("Force release cache {} since {} stores are active",
+                    groupId, activeStores);
+                cacheItem.forceRelease();
+              } else {
+                LOG.debug("Try release cache {}", groupId);
+                cacheItem.tryRelease();
+              }
               if (cacheItem.getAppLogs().isDone()) {
                 appIdLogMap.remove(groupId.getApplicationId());
               }
@@ -826,17 +838,19 @@ public class EntityGroupFSTimelineStore extends CompositeService
   @InterfaceAudience.Private
   @VisibleForTesting
   void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
+    cacheItem.incrRefs();
     cachedLogs.put(groupId, cacheItem);
   }
 
   private List<TimelineStore> getTimelineStoresFromCacheIds(
-      Set<TimelineEntityGroupId> groupIds, String entityType)
+      Set<TimelineEntityGroupId> groupIds, String entityType,
+      List<EntityCacheItem> cacheItems)
       throws IOException {
     List<TimelineStore> stores = new LinkedList<TimelineStore>();
     // For now we just handle one store in a context. We return the first
     // non-null storage for the group ids.
     for (TimelineEntityGroupId groupId : groupIds) {
-      TimelineStore storeForId = getCachedStore(groupId);
+      TimelineStore storeForId = getCachedStore(groupId, cacheItems);
       if (storeForId != null) {
         LOG.debug("Adding {} as a store for the query", storeForId.getName());
         stores.add(storeForId);
@@ -851,8 +865,9 @@ public class EntityGroupFSTimelineStore extends CompositeService
     return stores;
   }
 
-  private List<TimelineStore> getTimelineStoresForRead(String entityId,
-      String entityType) throws IOException {
+  protected List<TimelineStore> getTimelineStoresForRead(String entityId,
+      String entityType, List<EntityCacheItem> cacheItems)
+      throws IOException {
     Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
       LOG.debug("Trying plugin {} for id {} and type {}",
@@ -871,12 +886,12 @@ public class EntityGroupFSTimelineStore extends CompositeService
             cacheIdPlugin.getClass().getName());
       }
     }
-    return getTimelineStoresFromCacheIds(groupIds, entityType);
+    return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
   }
 
   private List<TimelineStore> getTimelineStoresForRead(String entityType,
-      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
-      throws IOException {
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      List<EntityCacheItem> cacheItems) throws IOException {
     Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
     for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
       Set<TimelineEntityGroupId> idsFromPlugin =
@@ -888,24 +903,26 @@ public class EntityGroupFSTimelineStore extends CompositeService
         groupIds.addAll(idsFromPlugin);
       }
     }
-    return getTimelineStoresFromCacheIds(groupIds, entityType);
+    return getTimelineStoresFromCacheIds(groupIds, entityType, cacheItems);
   }
 
   // find a cached timeline store or null if it cannot be located
-  private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
-      throws IOException {
+  private TimelineStore getCachedStore(TimelineEntityGroupId groupId,
+      List<EntityCacheItem> cacheItems) throws IOException {
     EntityCacheItem cacheItem;
     synchronized (this.cachedLogs) {
       // Note that the content in the cache log storage may be stale.
       cacheItem = this.cachedLogs.get(groupId);
       if (cacheItem == null) {
         LOG.debug("Set up new cache item for id {}", groupId);
-        cacheItem = new EntityCacheItem(getConfig(), fs);
+        cacheItem = new EntityCacheItem(groupId, getConfig(), fs);
         AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
         if (appLogs != null) {
           LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
           cacheItem.setAppLogs(appLogs);
           this.cachedLogs.put(groupId, cacheItem);
+          // Add the reference by the cache
+          cacheItem.incrRefs();
         } else {
           LOG.warn("AppLogs for groupId {} is set to null!", groupId);
         }
@@ -915,30 +932,43 @@ public class EntityGroupFSTimelineStore extends CompositeService
     if (cacheItem.getAppLogs() != null) {
       AppLogs appLogs = cacheItem.getAppLogs();
       LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
-      store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
-          objMapper, metrics);
+      // Add the reference by the store
+      cacheItem.incrRefs();
+      cacheItems.add(cacheItem);
+      store = cacheItem.refreshCache(aclManager, jsonFactory, objMapper,
+          metrics);
     } else {
       LOG.warn("AppLogs for group id {} is null", groupId);
     }
     return store;
   }
 
+  protected void tryReleaseCacheItems(List<EntityCacheItem> relatedCacheItems) {
+    for (EntityCacheItem item : relatedCacheItems) {
+      item.tryRelease();
+    }
+  }
+
   @Override
   public TimelineEntities getEntities(String entityType, Long limit,
       Long windowStart, Long windowEnd, String fromId, Long fromTs,
       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
       EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
     LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
+    List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
     List<TimelineStore> stores = getTimelineStoresForRead(entityType,
-        primaryFilter, secondaryFilters);
+        primaryFilter, secondaryFilters, relatedCacheItems);
     TimelineEntities returnEntities = new TimelineEntities();
     for (TimelineStore store : stores) {
       LOG.debug("Try timeline store {} for the request", store.getName());
-      returnEntities.addEntities(
-          store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
-              fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
-              checkAcl).getEntities());
+      TimelineEntities entities = store.getEntities(entityType, limit,
+          windowStart, windowEnd, fromId, fromTs, primaryFilter,
+          secondaryFilters, fieldsToRetrieve, checkAcl);
+      if (entities != null) {
+        returnEntities.addEntities(entities.getEntities());
+      }
     }
+    tryReleaseCacheItems(relatedCacheItems);
     return returnEntities;
   }
 
@@ -946,17 +976,21 @@ public class EntityGroupFSTimelineStore extends CompositeService
   public TimelineEntity getEntity(String entityId, String entityType,
       EnumSet<Field> fieldsToRetrieve) throws IOException {
     LOG.debug("getEntity type={} id={}", entityType, entityId);
-    List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
+    List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
+    List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType,
+        relatedCacheItems);
     for (TimelineStore store : stores) {
       LOG.debug("Try timeline store {}:{} for the request", store.getName(),
           store.toString());
       TimelineEntity e =
           store.getEntity(entityId, entityType, fieldsToRetrieve);
       if (e != null) {
+        tryReleaseCacheItems(relatedCacheItems);
         return e;
       }
     }
     LOG.debug("getEntity: Found nothing");
+    tryReleaseCacheItems(relatedCacheItems);
     return null;
   }
 
@@ -966,10 +1000,11 @@ public class EntityGroupFSTimelineStore extends CompositeService
       Long windowEnd, Set<String> eventTypes) throws IOException {
     LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
     TimelineEvents returnEvents = new TimelineEvents();
+    List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
     for (String entityId : entityIds) {
       LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
       List<TimelineStore> stores
-          = getTimelineStoresForRead(entityId, entityType);
+          = getTimelineStoresForRead(entityId, entityType, relatedCacheItems);
       for (TimelineStore store : stores) {
         LOG.debug("Try timeline store {}:{} for the request", store.getName(),
             store.toString());
@@ -978,9 +1013,12 @@ public class EntityGroupFSTimelineStore extends CompositeService
         TimelineEvents events =
             store.getEntityTimelines(entityType, entityIdSet, limit,
                 windowStart, windowEnd, eventTypes);
-        returnEvents.addEvents(events.getAllEvents());
+        if (events != null) {
+          returnEvents.addEvents(events.getAllEvents());
+        }
       }
     }
+    tryReleaseCacheItems(relatedCacheItems);
     return returnEvents;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
index 71e26cb..db241a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.yarn.server.timeline;
 
 import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import java.util.Collection;
 import java.util.Set;
@@ -26,31 +28,32 @@ import java.util.SortedSet;
 
 class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
 
-  private static TimelineEntityGroupId timelineEntityGroupId
-      = TimelineEntityGroupId.newInstance(
-      TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
+  static final String APP_ID_FILTER_NAME = "appid";
 
   @Override
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
       NameValuePair primaryFilter,
       Collection<NameValuePair> secondaryFilters) {
-    return Sets.newHashSet(timelineEntityGroupId);
+    ApplicationId appId
+        = ConverterUtils.toApplicationId(primaryFilter.getValue().toString());
+    return Sets.newHashSet(getStandardTimelineGroupId(appId));
   }
 
   @Override
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
       String entityType) {
-    return Sets.newHashSet(timelineEntityGroupId);
+    ApplicationId appId = ConverterUtils.toApplicationId(entityId);
+    return Sets.newHashSet(getStandardTimelineGroupId(appId));
   }
 
   @Override
   public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
       SortedSet<String> entityIds,
       Set<String> eventTypes) {
-    return Sets.newHashSet(timelineEntityGroupId);
+    return Sets.newHashSet();
   }
 
-  static TimelineEntityGroupId getStandardTimelineGroupId() {
-    return timelineEntityGroupId;
+  static TimelineEntityGroupId getStandardTimelineGroupId(ApplicationId appId) {
+    return TimelineEntityGroupId.newInstance(appId, "test");
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/705286cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 7a8ff2f..d6baab6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -44,8 +44,17 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 
 import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
 import static org.junit.Assert.assertEquals;
@@ -53,24 +62,15 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
-  private static final String SAMPLE_APP_NAME = "1234_5678";
+  private static final String SAMPLE_APP_PREFIX_CACHE_TEST = "1234_000";
+  private static final int CACHE_TEST_CACHE_SIZE = 5;
 
-  static final ApplicationId TEST_APPLICATION_ID
-      = ConverterUtils.toApplicationId(
-      ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
-
-  private static final String TEST_APP_DIR_NAME
-      = TEST_APPLICATION_ID.toString();
-  private static final String TEST_ATTEMPT_DIR_NAME
-      = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
   private static final String TEST_SUMMARY_LOG_FILE_NAME
       = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
-  private static final String TEST_ENTITY_LOG_FILE_NAME
-      = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
-          + EntityGroupPlugInForTest.getStandardTimelineGroupId();
   private static final String TEST_DOMAIN_LOG_FILE_NAME
       = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
 
@@ -78,9 +78,6 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
       = new Path(System.getProperty("test.build.data",
           System.getProperty("java.io.tmpdir")),
       TestEntityGroupFSTimelineStore.class.getSimpleName());
-  private static Path testAppDirPath;
-  private static Path testAttemptDirPath;
-  private static Path testDoneDirPath;
 
   private static Configuration config = new YarnConfiguration();
   private static MiniDFSCluster hdfsCluster;
@@ -88,7 +85,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   private static FileContext fc;
   private static FileContextTestHelper fileContextTestHelper =
       new FileContextTestHelper("/tmp/TestEntityGroupFSTimelineStore");
-  private EntityGroupFSTimelineStore store;
+
+  private static List<ApplicationId> sampleAppIds;
+  private static ApplicationId mainTestAppId;
+  private static Path mainTestAppDirPath;
+  private static Path testDoneDirPath;
+  private static String mainEntityLogFileName;
+
+  private EntityGroupFSTimelineStoreForTest store;
   private TimelineEntity entityNew;
 
   @Rule
@@ -101,23 +105,44 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         YarnConfiguration
             .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
         "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
+    config.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
+        CACHE_TEST_CACHE_SIZE);
     config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
     HdfsConfiguration hdfsConfig = new HdfsConfiguration();
     hdfsCluster
         = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
     fs = hdfsCluster.getFileSystem();
     fc = FileContext.getFileContext(hdfsCluster.getURI(0), config);
-    testAppDirPath = getTestRootPath(TEST_APPLICATION_ID.toString());
-    testAttemptDirPath = new Path(testAppDirPath, TEST_ATTEMPT_DIR_NAME);
-    testDoneDirPath = getTestRootPath("done");
-    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, testDoneDirPath.toString());
 
+    sampleAppIds = new ArrayList<>(CACHE_TEST_CACHE_SIZE + 1);
+    for (int i = 0; i < CACHE_TEST_CACHE_SIZE + 1; i++) {
+      ApplicationId appId = ConverterUtils.toApplicationId(
+          ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_PREFIX_CACHE_TEST
+              + i);
+      sampleAppIds.add(appId);
+    }
+    // Among all sample applicationIds, choose the first one for most of the
+    // tests.
+    mainTestAppId = sampleAppIds.get(0);
+    mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
+    mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+          + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
+
+    testDoneDirPath = getTestRootPath("done");
+    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+        testDoneDirPath.toString());
   }
 
   @Before
   public void setup() throws Exception {
-    createTestFiles();
-    store = new EntityGroupFSTimelineStore();
+    for (ApplicationId appId : sampleAppIds) {
+      Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
+          getAttemptDirName(appId));
+      createTestFiles(appId, attemotDirPath);
+    }
+
+    store = new EntityGroupFSTimelineStoreForTest();
     if (currTestName.getMethodName().contains("Plugin")) {
       config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
           EntityGroupPlugInForTest.class.getName());
@@ -130,7 +155,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   @After
   public void tearDown() throws Exception {
     store.stop();
-    fs.delete(testAppDirPath, true);
+    for (ApplicationId appId : sampleAppIds) {
+      fs.delete(getTestRootPath(appId.toString()), true);
+    }
   }
 
   @AfterClass
@@ -144,7 +171,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   @Test
   public void testAppLogsScanLogs() throws Exception {
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
     appLogs.scanForLogs();
     List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
@@ -160,14 +187,14 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
     for (LogInfo log : detailLogs) {
       String fileName = log.getFilename();
-      assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
+      assertEquals(fileName, mainEntityLogFileName);
     }
   }
 
   @Test
   public void testMoveToDone() throws Exception {
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
     Path pathBefore = appLogs.getAppDirPath();
     appLogs.moveToDone();
@@ -182,7 +209,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     MutableCounterLong scanned = store.metrics.getEntitiesReadToSummary();
     long beforeScan = scanned.value();
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
     appLogs.scanForLogs();
     appLogs.parseSummaryLogs(tdm);
@@ -194,6 +221,9 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   public void testCleanLogs() throws Exception {
     // Create test dirs and files
     // Irrelevant file, should not be reclaimed
+    String appDirName = mainTestAppId.toString();
+    String attemptDirName = ApplicationAttemptId.appAttemptIdStrPrefix
+        + appDirName + "_1";
     Path irrelevantFilePath = new Path(
             testDoneDirPath, "irrelevant.log");
     FSDataOutputStream stream = fs.create(irrelevantFilePath);
@@ -204,29 +234,29 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
     Path doneAppHomeDir = new Path(new Path(testDoneDirPath, "0000"), "001");
     // First application, untouched after creation
-    Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
-    Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
+    Path appDirClean = new Path(doneAppHomeDir, appDirName);
+    Path attemptDirClean = new Path(appDirClean, attemptDirName);
     fs.mkdirs(attemptDirClean);
     Path filePath = new Path(attemptDirClean, "test.log");
     stream = fs.create(filePath);
     stream.close();
     // Second application, one file touched after creation
-    Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
+    Path appDirHoldByFile = new Path(doneAppHomeDir, appDirName + "1");
     Path attemptDirHoldByFile
-        = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
+        = new Path(appDirHoldByFile, attemptDirName);
     fs.mkdirs(attemptDirHoldByFile);
     Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
     stream = fs.create(filePathHold);
     stream.close();
     // Third application, one dir touched after creation
-    Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
-    Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
+    Path appDirHoldByDir = new Path(doneAppHomeDir, appDirName + "2");
+    Path attemptDirHoldByDir = new Path(appDirHoldByDir, attemptDirName);
     fs.mkdirs(attemptDirHoldByDir);
     Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
     fs.mkdirs(dirPathHold);
     // Fourth application, empty dirs
-    Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
-    Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
+    Path appDirEmpty = new Path(doneAppHomeDir, appDirName + "3");
+    Path attemptDirEmpty = new Path(appDirEmpty, attemptDirName);
     fs.mkdirs(attemptDirEmpty);
     Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
     fs.mkdirs(dirPathEmpty);
@@ -274,12 +304,15 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
             YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
     // Load data and cache item, prepare timeline store by making a cache item
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
-    EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
+    EntityCacheItem cacheItem = new EntityCacheItem(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        config, fs);
     cacheItem.setAppLogs(appLogs);
     store.setCachedLogs(
-        EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        cacheItem);
     MutableCounterLong detailLogEntityRead =
         store.metrics.getGetEntityToDetailOps();
     MutableStat cacheRefresh = store.metrics.getCacheRefresh();
@@ -291,16 +324,20 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         = PluginStoreTestUtils.getTdmWithStore(config, store);
 
     // Verify single entity read
-    TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
+    TimelineEntity entity3 = tdm.getEntity("type_3", mainTestAppId.toString(),
         EnumSet.allOf(TimelineReader.Field.class),
         UserGroupInformation.getLoginUser());
     assertNotNull(entity3);
     assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+    assertEquals(1, cacheItem.getRefCount());
+    assertEquals(1, EntityCacheItem.getActiveStores());
     // Verify multiple entities read
-    TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
-        null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+    NameValuePair primaryFilter = new NameValuePair(
+        EntityGroupPlugInForTest.APP_ID_FILTER_NAME, mainTestAppId.toString());
+    TimelineEntities entities = tdm.getEntities("type_3", primaryFilter, null,
+        null, null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
         UserGroupInformation.getLoginUser());
-    assertEquals(entities.getEntities().size(), 1);
+    assertEquals(1, entities.getEntities().size());
     for (TimelineEntity entity : entities.getEntities()) {
       assertEquals(entityNew.getStartTime(), entity.getStartTime());
     }
@@ -309,11 +346,79 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     assertEquals(cacheRefreshBefore + 1L, cacheRefresh.lastStat().numSamples());
   }
 
+  @Test(timeout = 90000L)
+  public void testMultiplePluginRead() throws Exception {
+    Thread mainThread = Thread.currentThread();
+    mainThread.setName("testMain");
+    // Verify precondition
+    assertEquals(EntityGroupPlugInForTest.class.getName(),
+        store.getConfig().get(
+            YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
+    // Prepare timeline store by making cache items
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
+            AppState.COMPLETED);
+    final EntityCacheItem cacheItem = new EntityCacheItem(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        config, fs);
+
+    cacheItem.setAppLogs(appLogs);
+    store.setCachedLogs(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId),
+        cacheItem);
+
+    // Launch the blocking read call in a future
+    ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
+    FutureTask<TimelineEntity> blockingReader =
+        new FutureTask<>(new Callable<TimelineEntity>() {
+          public TimelineEntity call() throws Exception {
+            Thread currThread = Thread.currentThread();
+            currThread.setName("blockingReader");
+            return store.getEntityBlocking(mainTestAppId.toString(), "type_3",
+                EnumSet.allOf(TimelineReader.Field.class));
+          }});
+    threadExecutor.execute(blockingReader);
+    try {
+      while (!store.testCacheReferenced) {
+        Thread.sleep(300);
+      }
+    } catch (InterruptedException e) {
+      fail("Interrupted on exception " + e);
+    }
+    // Try refill the cache after the first cache item is referenced
+    for (ApplicationId appId : sampleAppIds) {
+      // Skip the first appId since it's already in cache
+      if (appId.equals(mainTestAppId)) {
+        continue;
+      }
+      EntityGroupFSTimelineStore.AppLogs currAppLog =
+          store.new AppLogs(appId, getTestRootPath(appId.toString()),
+              AppState.COMPLETED);
+      EntityCacheItem item = new EntityCacheItem(
+          EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
+          config, fs);
+      item.setAppLogs(currAppLog);
+      store.setCachedLogs(
+          EntityGroupPlugInForTest.getStandardTimelineGroupId(appId),
+          item);
+    }
+    // At this time, the cache item of the blocking reader should be evicted.
+    assertEquals(1, cacheItem.getRefCount());
+    store.testCanProceed = true;
+    TimelineEntity entity3 = blockingReader.get();
+
+    assertNotNull(entity3);
+    assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+    assertEquals(0, cacheItem.getRefCount());
+
+    threadExecutor.shutdownNow();
+  }
+
   @Test
   public void testSummaryRead() throws Exception {
     // Load data
     EntityGroupFSTimelineStore.AppLogs appLogs =
-        store.new AppLogs(TEST_APPLICATION_ID, testAppDirPath,
+        store.new AppLogs(mainTestAppId, mainTestAppDirPath,
         AppState.COMPLETED);
     MutableCounterLong summaryLogEntityRead
         = store.metrics.getGetEntityToSummaryOps();
@@ -331,28 +436,32 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
         UserGroupInformation.getLoginUser());
     assertEquals(entities.getEntities().size(), 1);
     for (TimelineEntity entity : entities.getEntities()) {
-      assertEquals((Long) 123l, entity.getStartTime());
+      assertEquals((Long) 123L, entity.getStartTime());
     }
     // Verify metrics
     assertEquals(numEntityReadBefore + 5L, summaryLogEntityRead.value());
 
   }
 
-  private void createTestFiles() throws IOException {
+  private void createTestFiles(ApplicationId appId, Path attemptDirPath)
+      throws IOException {
     TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
     PluginStoreTestUtils.writeEntities(entities,
-        new Path(testAttemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
-
+        new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
+    Map<String, Set<Object>> primaryFilters = new HashMap<>();
+    Set<Object> appSet = new HashSet<Object>();
+    appSet.add(appId.toString());
+    primaryFilters.put(EntityGroupPlugInForTest.APP_ID_FILTER_NAME, appSet);
     entityNew = PluginStoreTestUtils
-        .createEntity("id_3", "type_3", 789l, null, null,
-            null, null, "domain_id_1");
+        .createEntity(appId.toString(), "type_3", 789L, null, null,
+            primaryFilters, null, "domain_id_1");
     TimelineEntities entityList = new TimelineEntities();
     entityList.addEntity(entityNew);
     PluginStoreTestUtils.writeEntities(entityList,
-        new Path(testAttemptDirPath, TEST_ENTITY_LOG_FILE_NAME), fs);
+        new Path(attemptDirPath, mainEntityLogFileName), fs);
 
     FSDataOutputStream out = fs.create(
-        new Path(testAttemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
+        new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
     out.close();
   }
 
@@ -360,4 +469,41 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     return fileContextTestHelper.getTestRootPath(fc, pathString);
   }
 
+  private static String getAttemptDirName(ApplicationId appId) {
+    return ApplicationAttemptId.appAttemptIdStrPrefix + appId.toString() + "_1";
+  }
+
+  private static class EntityGroupFSTimelineStoreForTest
+      extends EntityGroupFSTimelineStore {
+    // Flags used for the concurrent testing environment
+    private volatile boolean testCanProceed = false;
+    private volatile boolean testCacheReferenced = false;
+
+    TimelineEntity getEntityBlocking(String entityId, String entityType,
+        EnumSet<Field> fieldsToRetrieve) throws IOException {
+      List<EntityCacheItem> relatedCacheItems = new ArrayList<>();
+      List<TimelineStore> stores = getTimelineStoresForRead(entityId,
+          entityType, relatedCacheItems);
+
+      testCacheReferenced = true;
+      try {
+        while (!testCanProceed) {
+          Thread.sleep(1000);
+        }
+      } catch (InterruptedException e) {
+        fail("Interrupted " + e);
+      }
+
+      for (TimelineStore store : stores) {
+        TimelineEntity e =
+            store.getEntity(entityId, entityType, fieldsToRetrieve);
+        if (e != null) {
+          tryReleaseCacheItems(relatedCacheItems);
+          return e;
+        }
+      }
+      tryReleaseCacheItems(relatedCacheItems);
+      return null;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org