You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/04 23:03:44 UTC

hadoop git commit: YARN-6811. [ATS1.5] All history logs should be kept under its own User Directory. Contributed by Rohith Sharma K S.

Repository: hadoop
Updated Branches:
  refs/heads/trunk bbc6d254c -> f44b349b8


YARN-6811. [ATS1.5] All history logs should be kept under its own User Directory. Contributed by Rohith Sharma K S.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f44b349b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f44b349b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f44b349b

Branch: refs/heads/trunk
Commit: f44b349b813508f0f6d99ca10bddba683dedf6c4
Parents: bbc6d25
Author: Junping Du <ju...@apache.org>
Authored: Fri Aug 4 16:03:56 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri Aug 4 16:03:56 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  4 +
 .../api/impl/FileSystemTimelineWriter.java      | 40 ++++++--
 .../src/main/resources/yarn-default.xml         | 10 ++
 .../api/impl/TestTimelineClientForATS1_5.java   | 81 ++++++++++++----
 .../timeline/EntityGroupFSTimelineStore.java    | 23 ++++-
 .../TestEntityGroupFSTimelineStore.java         | 99 ++++++++++++++++++--
 6 files changed, 224 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d608df8..71a7134 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2069,6 +2069,10 @@ public class YarnConfiguration extends Configuration {
       = TIMELINE_SERVICE_PREFIX
       + "entity-file.fs-support-append";
 
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir";
+
   /**
    * Settings for timeline service v2.0
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index fc3385b..b7bb48e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -145,9 +145,12 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl,
             timerTaskTTL);
 
-    this.isAppendSupported =
-        conf.getBoolean(
-            YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+    this.isAppendSupported = conf.getBoolean(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+
+    boolean storeInsideUserDir = conf.getBoolean(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
+        false);
 
     objMapper = createObjectMapper();
 
@@ -157,8 +160,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
         YarnConfiguration
             .DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
 
-    attemptDirCache =
-        new AttemptDirCache(attemptDirCacheSize, fs, activePath);
+    attemptDirCache = new AttemptDirCache(attemptDirCacheSize, fs, activePath,
+        authUgi, storeInsideUserDir);
 
     if (LOG.isDebugEnabled()) {
       StringBuilder debugMSG = new StringBuilder();
@@ -171,6 +174,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
               + "=" + ttl + ", " +
           YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
               + "=" + isAppendSupported + ", " +
+          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR
+              + "=" + storeInsideUserDir + ", " +
           YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
               + "=" + activePath);
 
@@ -946,8 +951,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     private final Map<ApplicationAttemptId, Path> attemptDirCache;
     private final FileSystem fs;
     private final Path activePath;
+    private final UserGroupInformation authUgi;
+    private final boolean storeInsideUserDir;
 
-    public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
+    public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath,
+        UserGroupInformation ugi, boolean storeInsideUserDir) {
       this.attemptDirCacheSize = cacheSize;
       this.attemptDirCache =
           new LinkedHashMap<ApplicationAttemptId, Path>(
@@ -961,6 +969,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
           };
       this.fs = fs;
       this.activePath = activePath;
+      this.authUgi = ugi;
+      this.storeInsideUserDir = storeInsideUserDir;
     }
 
     public Path getAppAttemptDir(ApplicationAttemptId attemptId)
@@ -993,8 +1003,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
     }
 
     private Path createApplicationDir(ApplicationId appId) throws IOException {
-      Path appDir =
-          new Path(activePath, appId.toString());
+      Path appRootDir = getAppRootDir(authUgi.getShortUserName());
+      Path appDir = new Path(appRootDir, appId.toString());
       if (FileSystem.mkdirs(fs, appDir,
           new FsPermission(APP_LOG_DIR_PERMISSIONS))) {
         if (LOG.isDebugEnabled()) {
@@ -1003,5 +1013,19 @@ public class FileSystemTimelineWriter extends TimelineWriter{
       }
       return appDir;
     }
+
+    private Path getAppRootDir(String user) throws IOException {
+      if (!storeInsideUserDir) {
+        return activePath;
+      }
+      Path userDir = new Path(activePath, user);
+      if (FileSystem.mkdirs(fs, userDir,
+          new FsPermission(APP_LOG_DIR_PERMISSIONS))) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("New user directory created - " + userDir);
+        }
+      }
+      return userDir;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 564a451..95b8a88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3244,4 +3244,14 @@
     <value>0.0.0.0:8091</value>
   </property>
 
+  <property>
+    <description>
+       It is TimelineClient 1.5 configuration whether to store active
+       application’s timeline data with in user directory i.e
+       ${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}
+    </description>
+    <name>yarn.timeline-service.entity-group-fs-store.with-user-dir</name>
+    <value>false</value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
index d3826e1..8573033 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
@@ -59,25 +59,30 @@ public class TestTimelineClientForATS1_5 {
   private static FileContext localFS;
   private static File localActiveDir;
   private TimelineWriter spyTimelineWriter;
+  private UserGroupInformation authUgi;
 
   @Before
   public void setup() throws Exception {
     localFS = FileContext.getLocalFSFileContext();
     localActiveDir =
         new File("target", this.getClass().getSimpleName() + "-activeDir")
-          .getAbsoluteFile();
+            .getAbsoluteFile();
     localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
     localActiveDir.mkdir();
     LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
+    authUgi = UserGroupInformation.getCurrentUser();
+  }
+
+  private YarnConfiguration getConfigurations() {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
     conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
-      localActiveDir.getAbsolutePath());
+        localActiveDir.getAbsolutePath());
     conf.set(
-      YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
-      "summary_type");
-    client = createTimelineClient(conf);
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+        "summary_type");
+    return conf;
   }
 
   @After
@@ -90,6 +95,21 @@ public class TestTimelineClientForATS1_5 {
 
   @Test
   public void testPostEntities() throws Exception {
+    client = createTimelineClient(getConfigurations());
+    verifyForPostEntities(false);
+  }
+
+  @Test
+  public void testPostEntitiesToKeepUnderUserDir() throws Exception {
+    YarnConfiguration conf = getConfigurations();
+    conf.setBoolean(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
+        true);
+    client = createTimelineClient(conf);
+    verifyForPostEntities(true);
+  }
+
+  private void verifyForPostEntities(boolean storeInsideUserDir) {
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     TimelineEntityGroupId groupId =
@@ -118,7 +138,8 @@ public class TestTimelineClientForATS1_5 {
       entityTDB[0] = entities[0];
       verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
       Assert.assertTrue(localFS.util().exists(
-        new Path(getAppAttemptDir(attemptId1), "summarylog-"
+          new Path(getAppAttemptDir(attemptId1, storeInsideUserDir),
+              "summarylog-"
             + attemptId1.toString())));
       reset(spyTimelineWriter);
 
@@ -132,13 +153,16 @@ public class TestTimelineClientForATS1_5 {
       verify(spyTimelineWriter, times(0)).putEntities(
         any(TimelineEntity[].class));
       Assert.assertTrue(localFS.util().exists(
-        new Path(getAppAttemptDir(attemptId2), "summarylog-"
+          new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
+              "summarylog-"
             + attemptId2.toString())));
       Assert.assertTrue(localFS.util().exists(
-        new Path(getAppAttemptDir(attemptId2), "entitylog-"
+          new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
+              "entitylog-"
             + groupId.toString())));
       Assert.assertTrue(localFS.util().exists(
-        new Path(getAppAttemptDir(attemptId2), "entitylog-"
+          new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
+              "entitylog-"
             + groupId2.toString())));
       reset(spyTimelineWriter);
     } catch (Exception e) {
@@ -148,6 +172,21 @@ public class TestTimelineClientForATS1_5 {
 
   @Test
   public void testPutDomain() {
+    client = createTimelineClient(getConfigurations());
+    verifyForPutDomain(false);
+  }
+
+  @Test
+  public void testPutDomainToKeepUnderUserDir() {
+    YarnConfiguration conf = getConfigurations();
+    conf.setBoolean(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
+        true);
+    client = createTimelineClient(conf);
+    verifyForPutDomain(true);
+  }
+
+  private void verifyForPutDomain(boolean storeInsideUserDir) {
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     ApplicationAttemptId attemptId1 =
@@ -161,23 +200,33 @@ public class TestTimelineClientForATS1_5 {
 
       client.putDomain(attemptId1, domain);
       verify(spyTimelineWriter, times(0)).putDomain(domain);
-      Assert.assertTrue(localFS.util().exists(
-        new Path(getAppAttemptDir(attemptId1), "domainlog-"
-            + attemptId1.toString())));
+      Assert.assertTrue(localFS.util()
+          .exists(new Path(getAppAttemptDir(attemptId1, storeInsideUserDir),
+              "domainlog-" + attemptId1.toString())));
       reset(spyTimelineWriter);
     } catch (Exception e) {
       Assert.fail("Exception is not expected." + e);
     }
   }
 
-  private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
-    Path appDir =
-        new Path(localActiveDir.getAbsolutePath(), appAttemptId
-          .getApplicationId().toString());
+  private Path getAppAttemptDir(ApplicationAttemptId appAttemptId,
+      boolean storeInsideUserDir) {
+    Path userDir = getUserDir(appAttemptId, storeInsideUserDir);
+    Path appDir = new Path(userDir, appAttemptId.getApplicationId().toString());
     Path attemptDir = new Path(appDir, appAttemptId.toString());
     return attemptDir;
   }
 
+  private Path getUserDir(ApplicationAttemptId appAttemptId,
+      boolean storeInsideUserDir) {
+    if (!storeInsideUserDir) {
+      return new Path(localActiveDir.getAbsolutePath());
+    }
+    Path userDir =
+        new Path(localActiveDir.getAbsolutePath(), authUgi.getShortUserName());
+    return userDir;
+  }
+
   private static TimelineEntity generateEntity(String type) {
     TimelineEntity entity = new TimelineEntity();
     entity.setEntityId("entity id");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index 1675a48..80baf89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -356,7 +356,13 @@ public class EntityGroupFSTimelineStore extends CompositeService
   @VisibleForTesting
   int scanActiveLogs() throws IOException {
     long startTime = Time.monotonicNow();
-    RemoteIterator<FileStatus> iter = list(activeRootPath);
+    int logsToScanCount = scanActiveLogs(activeRootPath);
+    metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
+    return logsToScanCount;
+  }
+
+  int scanActiveLogs(Path dir) throws IOException {
+    RemoteIterator<FileStatus> iter = list(dir);
     int logsToScanCount = 0;
     while (iter.hasNext()) {
       FileStatus stat = iter.next();
@@ -368,10 +374,9 @@ public class EntityGroupFSTimelineStore extends CompositeService
         AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
         executor.execute(new ActiveLogParser(logs));
       } else {
-        LOG.debug("Unable to parse entry {}", name);
+        logsToScanCount += scanActiveLogs(stat.getPath());
       }
     }
-    metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
     return logsToScanCount;
   }
 
@@ -418,6 +423,18 @@ public class EntityGroupFSTimelineStore extends CompositeService
         appDirPath = getActiveAppPath(applicationId);
         if (fs.exists(appDirPath)) {
           appState = AppState.ACTIVE;
+        } else {
+          // check for user directory inside active path
+          RemoteIterator<FileStatus> iter = list(activeRootPath);
+          while (iter.hasNext()) {
+            Path child = new Path(iter.next().getPath().getName(),
+                applicationId.toString());
+            appDirPath = new Path(activeRootPath, child);
+            if (fs.exists(appDirPath)) {
+              appState = AppState.ACTIVE;
+              break;
+            }
+          }
         }
       }
       if (appState != AppState.UNKNOWN) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 8540d45..0458722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -58,7 +60,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -91,6 +92,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   private static ApplicationId mainTestAppId;
   private static Path mainTestAppDirPath;
   private static Path testDoneDirPath;
+  private static Path testActiveDirPath;
   private static String mainEntityLogFileName;
 
   private EntityGroupFSTimelineStore store;
@@ -125,23 +127,28 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
               + i);
       sampleAppIds.add(appId);
     }
+    testActiveDirPath = getTestRootPath("active");
     // Among all sample applicationIds, choose the first one for most of the
     // tests.
     mainTestAppId = sampleAppIds.get(0);
-    mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
+    mainTestAppDirPath = new Path(testActiveDirPath, mainTestAppId.toString());
     mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
           + EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
 
     testDoneDirPath = getTestRootPath("done");
     config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
         testDoneDirPath.toString());
+    config.set(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+        testActiveDirPath.toString());
   }
 
   @Before
   public void setup() throws Exception {
     for (ApplicationId appId : sampleAppIds) {
-      Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
-          getAttemptDirName(appId));
+      Path attemotDirPath =
+          new Path(new Path(testActiveDirPath, appId.toString()),
+              getAttemptDirName(appId));
       createTestFiles(appId, attemotDirPath);
     }
 
@@ -178,7 +185,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
   public void tearDown() throws Exception {
     store.stop();
     for (ApplicationId appId : sampleAppIds) {
-      fs.delete(getTestRootPath(appId.toString()), true);
+      fs.delete(new Path(testActiveDirPath,appId.toString()), true);
     }
     if (testJar != null) {
       testJar.delete();
@@ -414,8 +421,88 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
 
   }
 
+  @Test
+  public void testGetEntityPluginRead() throws Exception {
+    EntityGroupFSTimelineStore store = null;
+    ApplicationId appId =
+        ApplicationId.fromString("application_1501509265053_0001");
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path userBase = new Path(testActiveDirPath, user);
+    Path userAppRoot = new Path(userBase, appId.toString());
+    Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
+
+    try {
+      store = createAndStartTimelineStore(AppState.ACTIVE);
+      String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+          + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
+      createTestFiles(appId, attemotDirPath, logFileName);
+      TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
+          entityNew.getEntityType(), EnumSet.allOf(Field.class));
+      assertNotNull(entity);
+      assertEquals(entityNew.getEntityId(), entity.getEntityId());
+      assertEquals(entityNew.getEntityType(), entity.getEntityType());
+    } finally {
+      if (store != null) {
+        store.stop();
+      }
+      fs.delete(userBase, true);
+    }
+  }
+
+  @Test
+  public void testScanActiveLogsAndMoveToDonePluginRead() throws Exception {
+    EntityGroupFSTimelineStore store = null;
+    ApplicationId appId =
+        ApplicationId.fromString("application_1501509265053_0002");
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    Path userBase = new Path(testActiveDirPath, user);
+    Path userAppRoot = new Path(userBase, appId.toString());
+    Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
+
+    try {
+      store = createAndStartTimelineStore(AppState.COMPLETED);
+      String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+          + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
+      createTestFiles(appId, attemotDirPath, logFileName);
+      store.scanActiveLogs();
+
+      TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
+          entityNew.getEntityType(), EnumSet.allOf(Field.class));
+      assertNotNull(entity);
+      assertEquals(entityNew.getEntityId(), entity.getEntityId());
+      assertEquals(entityNew.getEntityType(), entity.getEntityType());
+    } finally {
+      if (store != null) {
+        store.stop();
+      }
+      fs.delete(userBase, true);
+    }
+  }
+
+  private EntityGroupFSTimelineStore createAndStartTimelineStore(
+      AppState appstate) {
+    // stop before creating new store to get the lock
+    store.stop();
+    
+    EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
+      @Override
+      protected AppState getAppState(ApplicationId appId) throws IOException {
+        return appstate;
+      }
+    };
+    newStore.init(config);
+    newStore.setFs(fs);
+    newStore.start();
+    return newStore;
+  }
+
   private void createTestFiles(ApplicationId appId, Path attemptDirPath)
       throws IOException {
+    createTestFiles(appId, attemptDirPath, mainEntityLogFileName);
+  }
+
+  private void createTestFiles(ApplicationId appId, Path attemptDirPath,
+      String logPath) throws IOException {
     TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
     PluginStoreTestUtils.writeEntities(entities,
         new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
@@ -429,7 +516,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
     TimelineEntities entityList = new TimelineEntities();
     entityList.addEntity(entityNew);
     PluginStoreTestUtils.writeEntities(entityList,
-        new Path(attemptDirPath, mainEntityLogFileName), fs);
+        new Path(attemptDirPath, logPath), fs);
 
     FSDataOutputStream out = fs.create(
         new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org