You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2017/08/04 23:03:44 UTC
hadoop git commit: YARN-6811. [ATS1.5] All history logs should be
kept under its own User Directory. Contributed by Rohith Sharma K S.
Repository: hadoop
Updated Branches:
refs/heads/trunk bbc6d254c -> f44b349b8
YARN-6811. [ATS1.5] All history logs should be kept under its own User Directory. Contributed by Rohith Sharma K S.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f44b349b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f44b349b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f44b349b
Branch: refs/heads/trunk
Commit: f44b349b813508f0f6d99ca10bddba683dedf6c4
Parents: bbc6d25
Author: Junping Du <ju...@apache.org>
Authored: Fri Aug 4 16:03:56 2017 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Fri Aug 4 16:03:56 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 4 +
.../api/impl/FileSystemTimelineWriter.java | 40 ++++++--
.../src/main/resources/yarn-default.xml | 10 ++
.../api/impl/TestTimelineClientForATS1_5.java | 81 ++++++++++++----
.../timeline/EntityGroupFSTimelineStore.java | 23 ++++-
.../TestEntityGroupFSTimelineStore.java | 99 ++++++++++++++++++--
6 files changed, 224 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d608df8..71a7134 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2069,6 +2069,10 @@ public class YarnConfiguration extends Configuration {
= TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
+ public static final String
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR =
+ TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "with-user-dir";
+
/**
* Settings for timeline service v2.0
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
index fc3385b..b7bb48e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -145,9 +145,12 @@ public class FileSystemTimelineWriter extends TimelineWriter{
new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl,
timerTaskTTL);
- this.isAppendSupported =
- conf.getBoolean(
- YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+ this.isAppendSupported = conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+
+ boolean storeInsideUserDir = conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
+ false);
objMapper = createObjectMapper();
@@ -157,8 +160,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
YarnConfiguration
.DEFAULT_TIMELINE_SERVICE_CLIENT_INTERNAL_ATTEMPT_DIR_CACHE_SIZE);
- attemptDirCache =
- new AttemptDirCache(attemptDirCacheSize, fs, activePath);
+ attemptDirCache = new AttemptDirCache(attemptDirCacheSize, fs, activePath,
+ authUgi, storeInsideUserDir);
if (LOG.isDebugEnabled()) {
StringBuilder debugMSG = new StringBuilder();
@@ -171,6 +174,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
+ "=" + ttl + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ "=" + isAppendSupported + ", " +
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR
+ + "=" + storeInsideUserDir + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ "=" + activePath);
@@ -946,8 +951,11 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private final Map<ApplicationAttemptId, Path> attemptDirCache;
private final FileSystem fs;
private final Path activePath;
+ private final UserGroupInformation authUgi;
+ private final boolean storeInsideUserDir;
- public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath) {
+ public AttemptDirCache(int cacheSize, FileSystem fs, Path activePath,
+ UserGroupInformation ugi, boolean storeInsideUserDir) {
this.attemptDirCacheSize = cacheSize;
this.attemptDirCache =
new LinkedHashMap<ApplicationAttemptId, Path>(
@@ -961,6 +969,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
};
this.fs = fs;
this.activePath = activePath;
+ this.authUgi = ugi;
+ this.storeInsideUserDir = storeInsideUserDir;
}
public Path getAppAttemptDir(ApplicationAttemptId attemptId)
@@ -993,8 +1003,8 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
private Path createApplicationDir(ApplicationId appId) throws IOException {
- Path appDir =
- new Path(activePath, appId.toString());
+ Path appRootDir = getAppRootDir(authUgi.getShortUserName());
+ Path appDir = new Path(appRootDir, appId.toString());
if (FileSystem.mkdirs(fs, appDir,
new FsPermission(APP_LOG_DIR_PERMISSIONS))) {
if (LOG.isDebugEnabled()) {
@@ -1003,5 +1013,19 @@ public class FileSystemTimelineWriter extends TimelineWriter{
}
return appDir;
}
+
+ private Path getAppRootDir(String user) throws IOException {
+ if (!storeInsideUserDir) {
+ return activePath;
+ }
+ Path userDir = new Path(activePath, user);
+ if (FileSystem.mkdirs(fs, userDir,
+ new FsPermission(APP_LOG_DIR_PERMISSIONS))) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New user directory created - " + userDir);
+ }
+ }
+ return userDir;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 564a451..95b8a88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3244,4 +3244,14 @@
<value>0.0.0.0:8091</value>
</property>
+ <property>
+ <description>
+ It is TimelineClient 1.5 configuration whether to store active
+ application’s timeline data with in user directory i.e
+ ${yarn.timeline-service.entity-group-fs-store.active-dir}/${user.name}
+ </description>
+ <name>yarn.timeline-service.entity-group-fs-store.with-user-dir</name>
+ <value>false</value>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
index d3826e1..8573033 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
@@ -59,25 +59,30 @@ public class TestTimelineClientForATS1_5 {
private static FileContext localFS;
private static File localActiveDir;
private TimelineWriter spyTimelineWriter;
+ private UserGroupInformation authUgi;
@Before
public void setup() throws Exception {
localFS = FileContext.getLocalFSFileContext();
localActiveDir =
new File("target", this.getClass().getSimpleName() + "-activeDir")
- .getAbsoluteFile();
+ .getAbsoluteFile();
localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
localActiveDir.mkdir();
LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
+ authUgi = UserGroupInformation.getCurrentUser();
+ }
+
+ private YarnConfiguration getConfigurations() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
- localActiveDir.getAbsolutePath());
+ localActiveDir.getAbsolutePath());
conf.set(
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
- "summary_type");
- client = createTimelineClient(conf);
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+ "summary_type");
+ return conf;
}
@After
@@ -90,6 +95,21 @@ public class TestTimelineClientForATS1_5 {
@Test
public void testPostEntities() throws Exception {
+ client = createTimelineClient(getConfigurations());
+ verifyForPostEntities(false);
+ }
+
+ @Test
+ public void testPostEntitiesToKeepUnderUserDir() throws Exception {
+ YarnConfiguration conf = getConfigurations();
+ conf.setBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
+ true);
+ client = createTimelineClient(conf);
+ verifyForPostEntities(true);
+ }
+
+ private void verifyForPostEntities(boolean storeInsideUserDir) {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
TimelineEntityGroupId groupId =
@@ -118,7 +138,8 @@ public class TestTimelineClientForATS1_5 {
entityTDB[0] = entities[0];
verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
Assert.assertTrue(localFS.util().exists(
- new Path(getAppAttemptDir(attemptId1), "summarylog-"
+ new Path(getAppAttemptDir(attemptId1, storeInsideUserDir),
+ "summarylog-"
+ attemptId1.toString())));
reset(spyTimelineWriter);
@@ -132,13 +153,16 @@ public class TestTimelineClientForATS1_5 {
verify(spyTimelineWriter, times(0)).putEntities(
any(TimelineEntity[].class));
Assert.assertTrue(localFS.util().exists(
- new Path(getAppAttemptDir(attemptId2), "summarylog-"
+ new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
+ "summarylog-"
+ attemptId2.toString())));
Assert.assertTrue(localFS.util().exists(
- new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
+ "entitylog-"
+ groupId.toString())));
Assert.assertTrue(localFS.util().exists(
- new Path(getAppAttemptDir(attemptId2), "entitylog-"
+ new Path(getAppAttemptDir(attemptId2, storeInsideUserDir),
+ "entitylog-"
+ groupId2.toString())));
reset(spyTimelineWriter);
} catch (Exception e) {
@@ -148,6 +172,21 @@ public class TestTimelineClientForATS1_5 {
@Test
public void testPutDomain() {
+ client = createTimelineClient(getConfigurations());
+ verifyForPutDomain(false);
+ }
+
+ @Test
+ public void testPutDomainToKeepUnderUserDir() {
+ YarnConfiguration conf = getConfigurations();
+ conf.setBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_WITH_USER_DIR,
+ true);
+ client = createTimelineClient(conf);
+ verifyForPutDomain(true);
+ }
+
+ private void verifyForPutDomain(boolean storeInsideUserDir) {
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId attemptId1 =
@@ -161,23 +200,33 @@ public class TestTimelineClientForATS1_5 {
client.putDomain(attemptId1, domain);
verify(spyTimelineWriter, times(0)).putDomain(domain);
- Assert.assertTrue(localFS.util().exists(
- new Path(getAppAttemptDir(attemptId1), "domainlog-"
- + attemptId1.toString())));
+ Assert.assertTrue(localFS.util()
+ .exists(new Path(getAppAttemptDir(attemptId1, storeInsideUserDir),
+ "domainlog-" + attemptId1.toString())));
reset(spyTimelineWriter);
} catch (Exception e) {
Assert.fail("Exception is not expected." + e);
}
}
- private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
- Path appDir =
- new Path(localActiveDir.getAbsolutePath(), appAttemptId
- .getApplicationId().toString());
+ private Path getAppAttemptDir(ApplicationAttemptId appAttemptId,
+ boolean storeInsideUserDir) {
+ Path userDir = getUserDir(appAttemptId, storeInsideUserDir);
+ Path appDir = new Path(userDir, appAttemptId.getApplicationId().toString());
Path attemptDir = new Path(appDir, appAttemptId.toString());
return attemptDir;
}
+ private Path getUserDir(ApplicationAttemptId appAttemptId,
+ boolean storeInsideUserDir) {
+ if (!storeInsideUserDir) {
+ return new Path(localActiveDir.getAbsolutePath());
+ }
+ Path userDir =
+ new Path(localActiveDir.getAbsolutePath(), authUgi.getShortUserName());
+ return userDir;
+ }
+
private static TimelineEntity generateEntity(String type) {
TimelineEntity entity = new TimelineEntity();
entity.setEntityId("entity id");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
index 1675a48..80baf89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -356,7 +356,13 @@ public class EntityGroupFSTimelineStore extends CompositeService
@VisibleForTesting
int scanActiveLogs() throws IOException {
long startTime = Time.monotonicNow();
- RemoteIterator<FileStatus> iter = list(activeRootPath);
+ int logsToScanCount = scanActiveLogs(activeRootPath);
+ metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
+ return logsToScanCount;
+ }
+
+ int scanActiveLogs(Path dir) throws IOException {
+ RemoteIterator<FileStatus> iter = list(dir);
int logsToScanCount = 0;
while (iter.hasNext()) {
FileStatus stat = iter.next();
@@ -368,10 +374,9 @@ public class EntityGroupFSTimelineStore extends CompositeService
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
executor.execute(new ActiveLogParser(logs));
} else {
- LOG.debug("Unable to parse entry {}", name);
+ logsToScanCount += scanActiveLogs(stat.getPath());
}
}
- metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
return logsToScanCount;
}
@@ -418,6 +423,18 @@ public class EntityGroupFSTimelineStore extends CompositeService
appDirPath = getActiveAppPath(applicationId);
if (fs.exists(appDirPath)) {
appState = AppState.ACTIVE;
+ } else {
+ // check for user directory inside active path
+ RemoteIterator<FileStatus> iter = list(activeRootPath);
+ while (iter.hasNext()) {
+ Path child = new Path(iter.next().getPath().getName(),
+ applicationId.toString());
+ appDirPath = new Path(activeRootPath, child);
+ if (fs.exists(appDirPath)) {
+ appState = AppState.ACTIVE;
+ break;
+ }
+ }
}
}
if (appState != AppState.UNKNOWN) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f44b349b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
index 8540d45..0458722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.AfterClass;
@@ -58,7 +60,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -91,6 +92,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
private static ApplicationId mainTestAppId;
private static Path mainTestAppDirPath;
private static Path testDoneDirPath;
+ private static Path testActiveDirPath;
private static String mainEntityLogFileName;
private EntityGroupFSTimelineStore store;
@@ -125,23 +127,28 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
+ i);
sampleAppIds.add(appId);
}
+ testActiveDirPath = getTestRootPath("active");
// Among all sample applicationIds, choose the first one for most of the
// tests.
mainTestAppId = sampleAppIds.get(0);
- mainTestAppDirPath = getTestRootPath(mainTestAppId.toString());
+ mainTestAppDirPath = new Path(testActiveDirPath, mainTestAppId.toString());
mainEntityLogFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(mainTestAppId);
testDoneDirPath = getTestRootPath("done");
config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
testDoneDirPath.toString());
+ config.set(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+ testActiveDirPath.toString());
}
@Before
public void setup() throws Exception {
for (ApplicationId appId : sampleAppIds) {
- Path attemotDirPath = new Path(getTestRootPath(appId.toString()),
- getAttemptDirName(appId));
+ Path attemotDirPath =
+ new Path(new Path(testActiveDirPath, appId.toString()),
+ getAttemptDirName(appId));
createTestFiles(appId, attemotDirPath);
}
@@ -178,7 +185,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
public void tearDown() throws Exception {
store.stop();
for (ApplicationId appId : sampleAppIds) {
- fs.delete(getTestRootPath(appId.toString()), true);
+ fs.delete(new Path(testActiveDirPath,appId.toString()), true);
}
if (testJar != null) {
testJar.delete();
@@ -414,8 +421,88 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
}
+ @Test
+ public void testGetEntityPluginRead() throws Exception {
+ EntityGroupFSTimelineStore store = null;
+ ApplicationId appId =
+ ApplicationId.fromString("application_1501509265053_0001");
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path userBase = new Path(testActiveDirPath, user);
+ Path userAppRoot = new Path(userBase, appId.toString());
+ Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
+
+ try {
+ store = createAndStartTimelineStore(AppState.ACTIVE);
+ String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
+ createTestFiles(appId, attemotDirPath, logFileName);
+ TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
+ entityNew.getEntityType(), EnumSet.allOf(Field.class));
+ assertNotNull(entity);
+ assertEquals(entityNew.getEntityId(), entity.getEntityId());
+ assertEquals(entityNew.getEntityType(), entity.getEntityType());
+ } finally {
+ if (store != null) {
+ store.stop();
+ }
+ fs.delete(userBase, true);
+ }
+ }
+
+ @Test
+ public void testScanActiveLogsAndMoveToDonePluginRead() throws Exception {
+ EntityGroupFSTimelineStore store = null;
+ ApplicationId appId =
+ ApplicationId.fromString("application_1501509265053_0002");
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path userBase = new Path(testActiveDirPath, user);
+ Path userAppRoot = new Path(userBase, appId.toString());
+ Path attemotDirPath = new Path(userAppRoot, getAttemptDirName(appId));
+
+ try {
+ store = createAndStartTimelineStore(AppState.COMPLETED);
+ String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
+ createTestFiles(appId, attemotDirPath, logFileName);
+ store.scanActiveLogs();
+
+ TimelineEntity entity = store.getEntity(entityNew.getEntityId(),
+ entityNew.getEntityType(), EnumSet.allOf(Field.class));
+ assertNotNull(entity);
+ assertEquals(entityNew.getEntityId(), entity.getEntityId());
+ assertEquals(entityNew.getEntityType(), entity.getEntityType());
+ } finally {
+ if (store != null) {
+ store.stop();
+ }
+ fs.delete(userBase, true);
+ }
+ }
+
+ private EntityGroupFSTimelineStore createAndStartTimelineStore(
+ AppState appstate) {
+ // stop before creating new store to get the lock
+ store.stop();
+
+ EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
+ @Override
+ protected AppState getAppState(ApplicationId appId) throws IOException {
+ return appstate;
+ }
+ };
+ newStore.init(config);
+ newStore.setFs(fs);
+ newStore.start();
+ return newStore;
+ }
+
private void createTestFiles(ApplicationId appId, Path attemptDirPath)
throws IOException {
+ createTestFiles(appId, attemptDirPath, mainEntityLogFileName);
+ }
+
+ private void createTestFiles(ApplicationId appId, Path attemptDirPath,
+ String logPath) throws IOException {
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
PluginStoreTestUtils.writeEntities(entities,
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME), fs);
@@ -429,7 +516,7 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
PluginStoreTestUtils.writeEntities(entityList,
- new Path(attemptDirPath, mainEntityLogFileName), fs);
+ new Path(attemptDirPath, logPath), fs);
FSDataOutputStream out = fs.create(
new Path(attemptDirPath, TEST_DOMAIN_LOG_FILE_NAME));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org