You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/01/18 02:39:43 UTC

[1/2] hadoop git commit: YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe (cherry picked from commit 02f597c5db36ded385413958bdee793ad7eda40e) (cherry picked from commit 4a30a44b11

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 e75ca5213 -> a04f29ca9


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
new file mode 100644
index 0000000..71e26cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.SortedSet;
+
+class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
+
+  private static TimelineEntityGroupId timelineEntityGroupId
+      = TimelineEntityGroupId.newInstance(
+      TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
+      NameValuePair primaryFilter,
+      Collection<NameValuePair> secondaryFilters) {
+    return Sets.newHashSet(timelineEntityGroupId);
+  }
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
+      String entityType) {
+    return Sets.newHashSet(timelineEntityGroupId);
+  }
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
+      SortedSet<String> entityIds,
+      Set<String> eventTypes) {
+    return Sets.newHashSet(timelineEntityGroupId);
+  }
+
+  static TimelineEntityGroupId getStandardTimelineGroupId() {
+    return timelineEntityGroupId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
new file mode 100644
index 0000000..e0379b1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+class PluginStoreTestUtils {
+
+  static FSDataOutputStream createLogFile(Path logPath, FileSystem fs)
+      throws IOException {
+    FSDataOutputStream stream;
+    stream = fs.create(logPath, true);
+    return stream;
+  }
+
+  static ObjectMapper createObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+    mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false);
+    return mapper;
+  }
+
+  /**
+   * Create sample entities for testing
+   * @return two timeline entities in a {@link TimelineEntities} object
+   */
+  static TimelineEntities generateTestEntities() {
+    TimelineEntities entities = new TimelineEntities();
+    Map<String, Set<Object>> primaryFilters =
+        new HashMap<String, Set<Object>>();
+    Set<Object> l1 = new HashSet<Object>();
+    l1.add("username");
+    Set<Object> l2 = new HashSet<Object>();
+    l2.add(Integer.MAX_VALUE);
+    Set<Object> l3 = new HashSet<Object>();
+    l3.add("123abc");
+    Set<Object> l4 = new HashSet<Object>();
+    l4.add((long)Integer.MAX_VALUE + 1l);
+    primaryFilters.put("user", l1);
+    primaryFilters.put("appname", l2);
+    primaryFilters.put("other", l3);
+    primaryFilters.put("long", l4);
+    Map<String, Object> secondaryFilters = new HashMap<String, Object>();
+    secondaryFilters.put("startTime", 123456);
+    secondaryFilters.put("status", "RUNNING");
+    Map<String, Object> otherInfo1 = new HashMap<String, Object>();
+    otherInfo1.put("info1", "val1");
+    otherInfo1.putAll(secondaryFilters);
+
+    String entityId1 = "id_1";
+    String entityType1 = "type_1";
+    String entityId2 = "id_2";
+    String entityType2 = "type_2";
+
+    Map<String, Set<String>> relatedEntities =
+        new HashMap<String, Set<String>>();
+    relatedEntities.put(entityType2, Collections.singleton(entityId2));
+
+    TimelineEvent ev3 = createEvent(789l, "launch_event", null);
+    TimelineEvent ev4 = createEvent(0l, "init_event", null);
+    List<TimelineEvent> events = new ArrayList<TimelineEvent>();
+    events.add(ev3);
+    events.add(ev4);
+    entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null,
+        null, null, "domain_id_1"));
+
+    TimelineEvent ev1 = createEvent(123l, "start_event", null);
+    entities.addEntity(createEntity(entityId1, entityType1, 123l,
+        Collections.singletonList(ev1), relatedEntities, primaryFilters,
+        otherInfo1, "domain_id_1"));
+    return entities;
+  }
+
+  static void verifyTestEntities(TimelineDataManager tdm)
+      throws YarnException, IOException {
+    TimelineEntity entity1 = tdm.getEntity("type_1", "id_1",
+        EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    TimelineEntity entity2 = tdm.getEntity("type_2", "id_2",
+        EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertNotNull(entity1);
+    assertNotNull(entity2);
+    assertEquals("Failed to read out entity 1",
+        (Long) 123l, entity1.getStartTime());
+    assertEquals("Failed to read out entity 2",
+        (Long) 456l, entity2.getStartTime());
+  }
+
+  /**
+   * Create a test entity
+   */
+  static TimelineEntity createEntity(String entityId, String entityType,
+      Long startTime, List<TimelineEvent> events,
+      Map<String, Set<String>> relatedEntities,
+      Map<String, Set<Object>> primaryFilters,
+      Map<String, Object> otherInfo, String domainId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+    entity.setEvents(events);
+    if (relatedEntities != null) {
+      for (Map.Entry<String, Set<String>> e : relatedEntities.entrySet()) {
+        for (String v : e.getValue()) {
+          entity.addRelatedEntity(e.getKey(), v);
+        }
+      }
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    entity.setPrimaryFilters(primaryFilters);
+    entity.setOtherInfo(otherInfo);
+    entity.setDomainId(domainId);
+    return entity;
+  }
+
+  /**
+   * Create a test event
+   */
+  static TimelineEvent createEvent(long timestamp, String type, Map<String,
+      Object> info) {
+    TimelineEvent event = new TimelineEvent();
+    event.setTimestamp(timestamp);
+    event.setEventType(type);
+    event.setEventInfo(info);
+    return event;
+  }
+
+  /**
+   * Write timeline entities to a file system
+   * @param entities
+   * @param logPath
+   * @param fs
+   * @throws IOException
+   */
+  static void writeEntities(TimelineEntities entities, Path logPath,
+      FileSystem fs) throws IOException {
+    FSDataOutputStream outStream = createLogFile(logPath, fs);
+    JsonGenerator jsonGenerator
+        = (new JsonFactory()).createJsonGenerator(outStream);
+    jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+    ObjectMapper objMapper = createObjectMapper();
+    for (TimelineEntity entity : entities.getEntities()) {
+      objMapper.writeValue(jsonGenerator, entity);
+    }
+    outStream.close();
+  }
+
+  static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) {
+    TimelineACLsManager aclManager = new TimelineACLsManager(config);
+    TimelineDataManager tdm = new TimelineDataManager(store, aclManager);
+    tdm.init(config);
+    return tdm;
+  }
+
+  static TimelineDataManager getTdmWithMemStore(Configuration config) {
+    TimelineStore store = new MemoryTimelineStore();
+    TimelineDataManager tdm = getTdmWithStore(config, store);
+    return tdm;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
new file mode 100644
index 0000000..e43b705
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
+
+  private static final String SAMPLE_APP_NAME = "1234_5678";
+
+  static final ApplicationId TEST_APPLICATION_ID
+      = ConverterUtils.toApplicationId(
+      ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
+
+  private static final String TEST_APP_DIR_NAME
+      = TEST_APPLICATION_ID.toString();
+  private static final String TEST_ATTEMPT_DIR_NAME
+      = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
+  private static final String TEST_SUMMARY_LOG_FILE_NAME
+      = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
+  private static final String TEST_ENTITY_LOG_FILE_NAME
+      = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+          + EntityGroupPlugInForTest.getStandardTimelineGroupId();
+  private static final String TEST_DOMAIN_LOG_FILE_NAME
+      = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
+
+  private static final Path TEST_ROOT_DIR
+      = new Path(System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestEntityGroupFSTimelineStore.class.getSimpleName());
+  private static final Path TEST_APP_DIR_PATH
+      = new Path(TEST_ROOT_DIR, TEST_APP_DIR_NAME);
+  private static final Path TEST_ATTEMPT_DIR_PATH
+      = new Path(TEST_APP_DIR_PATH, TEST_ATTEMPT_DIR_NAME);
+  private static final Path TEST_DONE_DIR_PATH
+      = new Path(TEST_ROOT_DIR, "done");
+
+  private static Configuration config = new YarnConfiguration();
+  private static MiniDFSCluster hdfsCluster;
+  private static FileSystem fs;
+  private EntityGroupFSTimelineStore store;
+  private TimelineEntity entityNew;
+
+  @Rule
+  public TestName currTestName = new TestName();
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    config.set(
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+        "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
+    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+        TEST_DONE_DIR_PATH.toString());
+    config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
+    HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+    hdfsCluster
+        = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
+    fs = hdfsCluster.getFileSystem();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    createTestFiles();
+    store = new EntityGroupFSTimelineStore();
+    if (currTestName.getMethodName().contains("Plugin")) {
+      config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
+          EntityGroupPlugInForTest.class.getName());
+    }
+    store.init(config);
+    store.start();
+    store.setFs(fs);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(TEST_APP_DIR_PATH, true);
+    store.stop();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    hdfsCluster.shutdown();
+    FileContext fileContext = FileContext.getLocalFSFileContext();
+    fileContext.delete(new Path(
+        config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true);
+  }
+
+  @Test
+  public void testAppLogsScanLogs() throws Exception {
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    appLogs.scanForLogs();
+    List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
+    List<LogInfo> detailLogs = appLogs.getDetailLogs();
+    assertEquals(2, summaryLogs.size());
+    assertEquals(1, detailLogs.size());
+
+    for (LogInfo log : summaryLogs) {
+      String fileName = log.getFilename();
+      assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME)
+          || fileName.equals(TEST_DOMAIN_LOG_FILE_NAME));
+    }
+
+    for (LogInfo log : detailLogs) {
+      String fileName = log.getFilename();
+      assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
+    }
+  }
+
+  @Test
+  public void testMoveToDone() throws Exception {
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    Path pathBefore = appLogs.getAppDirPath();
+    appLogs.moveToDone();
+    Path pathAfter = appLogs.getAppDirPath();
+    assertNotEquals(pathBefore, pathAfter);
+    assertTrue(pathAfter.toString().contains(TEST_DONE_DIR_PATH.toString()));
+  }
+
+  @Test
+  public void testParseSummaryLogs() throws Exception {
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    appLogs.scanForLogs();
+    appLogs.parseSummaryLogs(tdm);
+    PluginStoreTestUtils.verifyTestEntities(tdm);
+  }
+
+  @Test
+  public void testCleanLogs() throws Exception {
+    // Create test dirs and files
+    // Irrelevant file, should not be reclaimed
+    Path irrelevantFilePath = new Path(
+        TEST_DONE_DIR_PATH, "irrelevant.log");
+    FSDataOutputStream stream = fs.create(irrelevantFilePath);
+    stream.close();
+    // Irrelevant directory, should not be reclaimed
+    Path irrelevantDirPath = new Path(TEST_DONE_DIR_PATH, "irrelevant");
+    fs.mkdirs(irrelevantDirPath);
+
+    Path doneAppHomeDir = new Path(new Path(TEST_DONE_DIR_PATH, "0000"), "001");
+    // First application, untouched after creation
+    Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
+    Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirClean);
+    Path filePath = new Path(attemptDirClean, "test.log");
+    stream = fs.create(filePath);
+    stream.close();
+    // Second application, one file touched after creation
+    Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
+    Path attemptDirHoldByFile
+        = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirHoldByFile);
+    Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
+    stream = fs.create(filePathHold);
+    stream.close();
+    // Third application, one dir touched after creation
+    Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
+    Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirHoldByDir);
+    Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
+    fs.mkdirs(dirPathHold);
+    // Fourth application, empty dirs
+    Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
+    Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirEmpty);
+    Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
+    fs.mkdirs(dirPathEmpty);
+
+    // Should retain all logs after this run
+    EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
+    assertTrue(fs.exists(irrelevantDirPath));
+    assertTrue(fs.exists(irrelevantFilePath));
+    assertTrue(fs.exists(filePath));
+    assertTrue(fs.exists(filePathHold));
+    assertTrue(fs.exists(dirPathHold));
+    assertTrue(fs.exists(dirPathEmpty));
+
+    // Make sure the created dir is old enough
+    Thread.sleep(2000);
+    // Touch the second application
+    stream = fs.append(filePathHold);
+    stream.writeBytes("append");
+    stream.close();
+    // Touch the third application by creating a new dir
+    fs.mkdirs(new Path(dirPathHold, "holdByMe"));
+
+    EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
+
+    // Verification after the second cleaner call
+    assertTrue(fs.exists(irrelevantDirPath));
+    assertTrue(fs.exists(irrelevantFilePath));
+    assertTrue(fs.exists(filePathHold));
+    assertTrue(fs.exists(dirPathHold));
+    assertTrue(fs.exists(doneAppHomeDir));
+
+    // appDirClean and appDirEmpty should be cleaned up
+    assertFalse(fs.exists(appDirClean));
+    assertFalse(fs.exists(appDirEmpty));
+  }
+
+  @Test
+  public void testPluginRead() throws Exception {
+    // Verify precondition
+    assertEquals(EntityGroupPlugInForTest.class.getName(),
+        store.getConfig().get(
+            YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
+    // Load data and cache item, prepare timeline store by making a cache item
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
+    cacheItem.setAppLogs(appLogs);
+    store.setCachedLogs(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
+    // Generate TDM
+    TimelineDataManager tdm
+        = PluginStoreTestUtils.getTdmWithStore(config, store);
+
+    // Verify single entity read
+    TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
+        EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertNotNull(entity3);
+    assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+    // Verify multiple entities read
+    TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
+        null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertEquals(entities.getEntities().size(), 1);
+    for (TimelineEntity entity : entities.getEntities()) {
+      assertEquals(entityNew.getStartTime(), entity.getStartTime());
+    }
+  }
+
+  @Test
+  public void testSummaryRead() throws Exception {
+    // Load data
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    TimelineDataManager tdm
+        = PluginStoreTestUtils.getTdmWithStore(config, store);
+    appLogs.scanForLogs();
+    appLogs.parseSummaryLogs(tdm);
+
+    // Verify single entity read
+    PluginStoreTestUtils.verifyTestEntities(tdm);
+    // Verify multiple entities read
+    TimelineEntities entities = tdm.getEntities("type_1", null, null, null,
+        null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertEquals(entities.getEntities().size(), 1);
+    for (TimelineEntity entity : entities.getEntities()) {
+      assertEquals((Long) 123l, entity.getStartTime());
+    }
+
+  }
+
+  private void createTestFiles() throws IOException {
+    TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
+    PluginStoreTestUtils.writeEntities(entities,
+        new Path(TEST_ATTEMPT_DIR_PATH, TEST_SUMMARY_LOG_FILE_NAME), fs);
+
+    entityNew = PluginStoreTestUtils
+        .createEntity("id_3", "type_3", 789l, null, null,
+            null, null, "domain_id_1");
+    TimelineEntities entityList = new TimelineEntities();
+    entityList.addEntity(entityNew);
+    PluginStoreTestUtils.writeEntities(entityList,
+        new Path(TEST_ATTEMPT_DIR_PATH, TEST_ENTITY_LOG_FILE_NAME), fs);
+
+    FSDataOutputStream out = fs.create(
+        new Path(TEST_ATTEMPT_DIR_PATH, TEST_DOMAIN_LOG_FILE_NAME));
+    out.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
new file mode 100644
index 0000000..fa6fcc7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.EnumSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestLogInfo {
+
+  private static final Path TEST_ROOT_DIR = new Path(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestLogInfo.class.getSimpleName());
+
+  private static final String TEST_ATTEMPT_DIR_NAME = "test_app";
+  private static final String TEST_ENTITY_FILE_NAME = "test_entity";
+  private static final String TEST_DOMAIN_FILE_NAME = "test_domain";
+  private static final String TEST_BROKEN_FILE_NAME = "test_broken";
+
+  private Configuration config = new YarnConfiguration();
+  private MiniDFSCluster hdfsCluster;
+  private FileSystem fs;
+  private ObjectMapper objMapper;
+
+  private JsonFactory jsonFactory = new JsonFactory();
+  private JsonGenerator jsonGenerator;
+  private FSDataOutputStream outStream = null;
+  private FSDataOutputStream outStreamDomain = null;
+
+  private TimelineDomain testDomain;
+
+  private static final short FILE_LOG_DIR_PERMISSIONS = 0770;
+
+  @Before
+  public void setup() throws Exception {
+    config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
+    HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+    hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
+    fs = hdfsCluster.getFileSystem();
+    Path testAppDirPath = new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS));
+    objMapper = PluginStoreTestUtils.createObjectMapper();
+
+    TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities();
+    writeEntitiesLeaveOpen(testEntities,
+        new Path(testAppDirPath, TEST_ENTITY_FILE_NAME));
+
+    testDomain = new TimelineDomain();
+    testDomain.setId("domain_1");
+    testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName());
+    testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName());
+    testDomain.setDescription("description");
+    writeDomainLeaveOpen(testDomain,
+        new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME));
+
+    writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    jsonGenerator.close();
+    outStream.close();
+    outStreamDomain.close();
+    hdfsCluster.shutdown();
+  }
+
+  @Test
+  public void testMatchesGroupId() throws Exception {
+    String testGroupId = "app1_group1";
+    // Match
+    EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        "app1_group1",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        "test_app1_group1",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    // Unmatch
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+    // Check delimiters
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    // Check file names shorter than group id
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+  }
+
+  @Test
+  public void testParseEntity() throws Exception {
+    // Load test data
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_ENTITY_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    // Verify for the first batch
+    PluginStoreTestUtils.verifyTestEntities(tdm);
+    // Load new data
+    TimelineEntity entityNew = PluginStoreTestUtils
+        .createEntity("id_3", "type_3", 789l, null, null,
+            null, null, "domain_id_1");
+    TimelineEntities entityList = new TimelineEntities();
+    entityList.addEntity(entityNew);
+    writeEntitiesLeaveOpen(entityList,
+        new Path(new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME),
+            TEST_ENTITY_FILE_NAME));
+    testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    // Verify the newly added data
+    TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(),
+        entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertNotNull(entity3);
+    assertEquals("Failed to read out entity new",
+        entityNew.getStartTime(), entity3.getStartTime());
+    tdm.close();
+  }
+
+  @Test
+  public void testParseBrokenEntity() throws Exception {
+    // Load test data
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_BROKEN_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_BROKEN_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    // Try parse, should not fail
+    testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    tdm.close();
+  }
+
+  @Test
+  public void testParseDomain() throws Exception {
+    // Load test data
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_DOMAIN_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    // Verify domain data
+    TimelineDomain resultDomain = tdm.getDomain("domain_1",
+        UserGroupInformation.getLoginUser());
+    assertNotNull(resultDomain);
+    assertEquals(testDomain.getReaders(), resultDomain.getReaders());
+    assertEquals(testDomain.getOwner(), resultDomain.getOwner());
+    assertEquals(testDomain.getDescription(), resultDomain.getDescription());
+  }
+
+  private void writeBrokenFile(Path logPath) throws IOException {
+    FSDataOutputStream out = null;
+    try {
+      String broken = "{ broken { [[]} broken";
+      out = PluginStoreTestUtils.createLogFile(logPath, fs);
+      out.write(broken.getBytes(Charset.forName("UTF-8")));
+      out.close();
+      out = null;
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  // TestLogInfo needs to maintain opened hdfs files so we have to build our own
+  // write methods
+  private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath)
+      throws IOException {
+    if (outStream == null) {
+      outStream = PluginStoreTestUtils.createLogFile(logPath, fs);
+      jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream);
+      jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+    }
+    for (TimelineEntity entity : entities.getEntities()) {
+      objMapper.writeValue(jsonGenerator, entity);
+    }
+    outStream.hflush();
+  }
+
+  private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath)
+      throws IOException {
+    if (outStreamDomain == null) {
+      outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs);
+    }
+    // Write domain uses its own json generator to isolate from entity writers
+    JsonGenerator jsonGeneratorLocal
+        = (new JsonFactory()).createJsonGenerator(outStreamDomain);
+    jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+    objMapper.writeValue(jsonGeneratorLocal, domain);
+    outStreamDomain.hflush();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index 44631f7..d83ac24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -42,5 +42,6 @@
     <module>hadoop-yarn-server-sharedcachemanager</module>
     <module>hadoop-yarn-server-tests</module>
     <module>hadoop-yarn-server-applicationhistoryservice</module>
+    <module>hadoop-yarn-server-timeline-pluginstorage</module>
   </modules>
 </project>


[2/2] hadoop git commit: YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe (cherry picked from commit 02f597c5db36ded385413958bdee793ad7eda40e) (cherry picked from commit 4a30a44b11

Posted by ju...@apache.org.
YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe
(cherry picked from commit 02f597c5db36ded385413958bdee793ad7eda40e)
(cherry picked from commit 4a30a44b115ce5164b7bd4689ad78339bf940c00)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a04f29ca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a04f29ca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a04f29ca

Branch: refs/heads/branch-2.8
Commit: a04f29ca90ab9971b1b36e7da4aebf00a8823d75
Parents: e75ca52
Author: Junping Du <ju...@apache.org>
Authored: Sun Jan 17 17:37:40 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Sun Jan 17 17:44:17 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  62 +-
 .../src/main/resources/yarn-default.xml         |  58 ++
 .../pom.xml                                     |  11 +
 .../ApplicationHistoryServer.java               |   5 +-
 .../server/timeline/TimelineDataManager.java    |   1 -
 .../timeline/TimelineDataManagerMetrics.java    |  11 +-
 .../TestApplicationHistoryClientService.java    |   1 +
 ...pplicationHistoryManagerOnTimelineStore.java |   1 +
 .../webapp/TestAHSWebServices.java              |   1 +
 .../timeline/TestTimelineDataManager.java       |   1 +
 .../pom.xml                                     | 136 +++
 .../yarn/server/timeline/EntityCacheItem.java   | 170 ++++
 .../timeline/EntityGroupFSTimelineStore.java    | 895 +++++++++++++++++++
 .../hadoop/yarn/server/timeline/LogInfo.java    | 281 ++++++
 .../timeline/TimelineEntityGroupPlugin.java     |  74 ++
 .../yarn/server/timeline/package-info.java      |  23 +
 .../timeline/EntityGroupPlugInForTest.java      |  56 ++
 .../server/timeline/PluginStoreTestUtils.java   | 208 +++++
 .../TestEntityGroupFSTimelineStore.java         | 332 +++++++
 .../yarn/server/timeline/TestLogInfo.java       | 253 ++++++
 .../hadoop-yarn/hadoop-yarn-server/pom.xml      |   1 +
 22 files changed, 2574 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4d63a6c..9b985a2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -211,6 +211,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
     junping_du)
 
+    YARN-4265. Provide new timeline plugin storage to support fine-grained entity
+    caching. (Li Lu and Jason Lowe via junping_du)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6299a38..04f61da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1546,6 +1546,7 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_VERSION = TIMELINE_SERVICE_PREFIX
       + "version";
   public static final float DEFAULT_TIMELINE_SERVICE_VERSION = 1.0f;
+
   /**
    * Comma seperated list of names for UIs hosted in the timeline server
    * (For pluggable UIs).
@@ -1581,17 +1582,70 @@ public class YarnConfiguration extends Configuration {
       TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
       "/tmp/entity-file-history/active";
 
+  public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "done-dir";
   public static final String
-      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
-      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT =
+      "/tmp/entity-file-history/done";
+
+  public static final String TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "group-id-plugin-classes";
+
   public static final String
-      DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
-      "2000, 500";
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-store";
 
   public static final String
       TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
       TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
 
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "scan-interval-seconds";
+  public static final long
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT = 60;
+
+  public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "threads";
+  public static final int
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT = 16;
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE
+      = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "app-cache-size";
+  public static final int
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT = 10;
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "cleaner-interval-seconds";
+  public static final int
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT =
+        60 * 60;
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS
+      = TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retain-seconds";
+  public static final int
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT =
+        7 * 24 * 60 * 60;
+
+  // how old the most recent log of an UNKNOWN app needs to be in the active
+  // directory before we treat it as COMPLETED
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "unknown-active-seconds";
+  public static final int
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
+      = 24 * 60 * 60;
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+  public static final String
+      DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+      "2000, 500";
+
   public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
       TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
   public static final long

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 13a7b1b..fe308ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1978,6 +1978,64 @@
     <value>${hadoop.tmp.dir}/yarn/timeline</value>
   </property>
 
+  <!-- Timeline Service v1.5 Configuration -->
+
+  <property>
+    <name>yarn.timeline-service.entity-group-fs-store.active-dir</name>
+    <value>/tmp/entity-file-history/active</value>
+    <description>HDFS path to store active application’s timeline data</description>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.entity-group-fs-store.done-dir</name>
+    <value>/tmp/entity-file-history/done/</value>
+    <description>HDFS path to store done application’s timeline data</description>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes</name>
+    <value></value>
+    <description>
+      Plugins that can translate a timeline entity read request into
+      a list of timeline entity group ids, separated by commas.
+    </description>
+  </property>
+
+   <property>
+     <name>yarn.timeline-service.entity-group-fs-store.summary-store</name>
+     <description>Summary storage for ATS v1.5</description>
+     <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.entity-group-fs-store.scan-interval-seconds</name>
+    <description>
+      Scan interval for ATS v1.5 entity group file system storage reader.This
+      value controls how frequent the reader will scan the HDFS active directory
+      for application status.
+    </description>
+    <value>60</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.entity-group-fs-store.cleaner-interval-seconds</name>
+    <description>
+      Scan interval for ATS v1.5 entity group file system storage cleaner.This
+      value controls how frequent the reader will scan the HDFS done directory
+      for stale application data.
+    </description>
+    <value>3600</value>
+  </property>
+
+  <property>
+    <name>yarn.timeline-service.entity-group-fs-store.retain-seconds</name>
+    <description>
+      How long the ATS v1.5 entity group file system storage will keep an
+      application's data in the done directory.
+    </description>
+    <value>604800</value>
+  </property>
+
   <!--  Shared Cache Configuration -->
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 1d3ecef..64814a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -220,6 +220,17 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
index cda84a2..d0af778 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -233,8 +233,9 @@ public class ApplicationHistoryServer extends CompositeService {
   }
 
   private TimelineDataManager createTimelineDataManager(Configuration conf) {
-    return new TimelineDataManager(
-        timelineStore, new TimelineACLsManager(conf));
+    TimelineACLsManager aclsMgr = new TimelineACLsManager(conf);
+    aclsMgr.setTimelineStore(timelineStore);
+    return new TimelineDataManager(timelineStore, aclsMgr);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
index 23ff8e4..57a9346 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
@@ -67,7 +67,6 @@ public class TimelineDataManager extends AbstractService {
     super(TimelineDataManager.class.getName());
     this.store = store;
     this.timelineACLsManager = timelineACLsManager;
-    timelineACLsManager.setTimelineStore(store);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
index afd5818..3591b39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManagerMetrics.java
@@ -92,12 +92,17 @@ public class TimelineDataManagerMetrics {
         getDomainsOps.value();
   }
 
+  private static TimelineDataManagerMetrics instance = null;
+
   TimelineDataManagerMetrics() {
   }
 
-  public static TimelineDataManagerMetrics create() {
-    MetricsSystem ms = DefaultMetricsSystem.instance();
-    return ms.register(new TimelineDataManagerMetrics());
+  public static synchronized TimelineDataManagerMetrics create() {
+    if (instance == null) {
+      MetricsSystem ms = DefaultMetricsSystem.instance();
+      instance = ms.register(new TimelineDataManagerMetrics());
+    }
+    return instance;
   }
 
   public void incrGetEntitiesOps() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
index 1e98e8d..7ef6eca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryClientService.java
@@ -65,6 +65,7 @@ public class TestApplicationHistoryClientService {
     TimelineStore store =
         TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
     TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
+    aclsManager.setTimelineStore(store);
     dataManager =
         new TimelineDataManager(store, aclsManager);
     dataManager.init(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
index a669f37..dfc5b81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
@@ -96,6 +96,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
   public void setup() throws Exception {
     // Only test the ACLs of the generic history
     TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration());
+    aclsManager.setTimelineStore(store);
     TimelineDataManager dataManager =
         new TimelineDataManager(store, aclsManager);
     dataManager.init(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index f2179b4..20dfe45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -90,6 +90,7 @@ public class TestAHSWebServices extends JerseyTestBase {
     TimelineStore store =
         TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
     TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
+    aclsManager.setTimelineStore(store);
     TimelineDataManager dataManager =
         new TimelineDataManager(store, aclsManager);
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
index ace2eb8..8fba54c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestTimelineDataManager.java
@@ -62,6 +62,7 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
 
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
     aclsManager = new TimelineACLsManager(conf);
+    aclsManager.setTimelineStore(store);
     dataManaer = new TimelineDataManager(store, aclsManager);
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
new file mode 100644
index 0000000..385ba5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
@@ -0,0 +1,136 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hadoop-yarn-server</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>3.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <name>Apache Hadoop YARN Timeline Plugin Storage</name>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+  </properties>
+
+  <dependencies>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-xc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
new file mode 100644
index 0000000..37a1d8d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Cache item for timeline server v1.5 reader cache. Each cache item has a
+ * TimelineStore that can be filled with data within one entity group.
+ */
+public class EntityCacheItem {
+  private static final Logger LOG
+      = LoggerFactory.getLogger(EntityCacheItem.class);
+
+  private TimelineStore store;
+  private EntityGroupFSTimelineStore.AppLogs appLogs;
+  private long lastRefresh;
+  private Configuration config;
+  private FileSystem fs;
+
+  public EntityCacheItem(Configuration config, FileSystem fs) {
+    this.config = config;
+    this.fs = fs;
+  }
+
+  /**
+   * @return The application log associated to this cache item, may be null.
+   */
+  public synchronized EntityGroupFSTimelineStore.AppLogs getAppLogs() {
+    return this.appLogs;
+  }
+
+  /**
+   * Set the application logs to this cache item. The entity group should be
+   * associated with this application.
+   *
+   * @param incomingAppLogs
+   */
+  public synchronized void setAppLogs(
+      EntityGroupFSTimelineStore.AppLogs incomingAppLogs) {
+    this.appLogs = incomingAppLogs;
+  }
+
+  /**
+   * @return The timeline store, either loaded or unloaded, of this cache item.
+   */
+  public synchronized TimelineStore getStore() {
+    return store;
+  }
+
+  /**
+   * Refresh this cache item if it needs refresh. This will enforce an appLogs
+   * rescan and then load new data. The refresh process is synchronized with
+   * other operations on the same cache item.
+   *
+   * @param groupId
+   * @param aclManager
+   * @param jsonFactory
+   * @param objMapper
+   * @return a {@link org.apache.hadoop.yarn.server.timeline.TimelineStore}
+   *         object filled with all entities in the group.
+   * @throws IOException
+   */
+  public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
+      TimelineACLsManager aclManager, JsonFactory jsonFactory,
+      ObjectMapper objMapper) throws IOException {
+    if (needRefresh()) {
+      // If an application is not finished, we only update summary logs (and put
+      // new entities into summary storage).
+      // Otherwise, since the application is done, we can update detail logs.
+      if (!appLogs.isDone()) {
+        appLogs.parseSummaryLogs();
+      } else if (appLogs.getDetailLogs().isEmpty()) {
+        appLogs.scanForLogs();
+      }
+      if (!appLogs.getDetailLogs().isEmpty()) {
+        if (store == null) {
+          store = new MemoryTimelineStore();
+          store.init(config);
+          store.start();
+        }
+        TimelineDataManager tdm = new TimelineDataManager(store,
+            aclManager);
+        tdm.init(config);
+        tdm.start();
+        List<LogInfo> removeList = new ArrayList<LogInfo>();
+        for (LogInfo log : appLogs.getDetailLogs()) {
+          LOG.debug("Try refresh logs for {}", log.getFilename());
+          // Only refresh the log that matches the cache id
+          if (log.matchesGroupId(groupId)) {
+            Path appDirPath = appLogs.getAppDirPath();
+            if (fs.exists(log.getPath(appDirPath))) {
+              LOG.debug("Refresh logs for cache id {}", groupId);
+              log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
+                  objMapper, fs);
+            } else {
+              // The log may have been removed, remove the log
+              removeList.add(log);
+              LOG.info("File {} no longer exists, remove it from log list",
+                  log.getPath(appDirPath));
+            }
+          }
+        }
+        appLogs.getDetailLogs().removeAll(removeList);
+        tdm.close();
+      }
+      updateRefreshTimeToNow();
+    } else {
+      LOG.debug("Cache new enough, skip refreshing");
+    }
+    return store;
+  }
+
+  /**
+   * Release the cache item for the given group id.
+   *
+   * @param groupId
+   */
+  public synchronized void releaseCache(TimelineEntityGroupId groupId) {
+    try {
+      if (store != null) {
+        store.close();
+      }
+    } catch (IOException e) {
+      LOG.warn("Error closing timeline store", e);
+    }
+    store = null;
+    // reset offsets so next time logs are re-parsed
+    for (LogInfo log : appLogs.getDetailLogs()) {
+      if (log.getFilename().contains(groupId.toString())) {
+        log.setOffset(0);
+      }
+    }
+  }
+
+  private boolean needRefresh() {
+    return (Time.monotonicNow() - lastRefresh > 10000);
+  }
+
+  private void updateRefreshTimeToNow() {
+    this.lastRefresh = Time.monotonicNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
new file mode 100644
index 0000000..b1fbd13
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java
@@ -0,0 +1,895 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.MappingJsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Plugin timeline storage to support timeline server v1.5 API. This storage
+ * uses a file system to store timeline entities in their groups.
+ */
+public class EntityGroupFSTimelineStore extends AbstractService
+    implements TimelineStore {
+
+  static final String DOMAIN_LOG_PREFIX = "domainlog-";
+  static final String SUMMARY_LOG_PREFIX = "summarylog-";
+  static final String ENTITY_LOG_PREFIX = "entitylog-";
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntityGroupFSTimelineStore.class);
+  private static final FsPermission ACTIVE_DIR_PERMISSION =
+      new FsPermission((short) 01777);
+  private static final FsPermission DONE_DIR_PERMISSION =
+      new FsPermission((short) 0700);
+
+  private static final EnumSet<YarnApplicationState>
+      APP_FINAL_STATES = EnumSet.of(
+      YarnApplicationState.FAILED,
+      YarnApplicationState.KILLED,
+      YarnApplicationState.FINISHED);
+  // Active dir: <activeRoot>/appId/attemptId/cacheId.log
+  // Done dir: <doneRoot>/cluster_ts/hash1/hash2/appId/attemptId/cacheId.log
+  private static final String APP_DONE_DIR_PREFIX_FORMAT =
+      "%d" + Path.SEPARATOR     // cluster timestamp
+          + "%04d" + Path.SEPARATOR // app num / 1,000,000
+          + "%03d" + Path.SEPARATOR // (app num / 1000) % 1000
+          + "%s" + Path.SEPARATOR; // full app id
+
+  private YarnClient yarnClient;
+  private TimelineStore summaryStore;
+  private TimelineACLsManager aclManager;
+  private TimelineDataManager summaryTdm;
+  private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
+      new ConcurrentHashMap<ApplicationId, AppLogs>();
+  private ScheduledThreadPoolExecutor executor;
+  private FileSystem fs;
+  private ObjectMapper objMapper;
+  private JsonFactory jsonFactory;
+  private Path activeRootPath;
+  private Path doneRootPath;
+  private long logRetainMillis;
+  private long unknownActiveMillis;
+  private int appCacheMaxSize = 0;
+  private List<TimelineEntityGroupPlugin> cacheIdPlugins;
+  private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
+
+  public EntityGroupFSTimelineStore() {
+    super(EntityGroupFSTimelineStore.class.getSimpleName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    summaryStore = createSummaryStore();
+    summaryStore.init(conf);
+    long logRetainSecs = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT);
+    logRetainMillis = logRetainSecs * 1000;
+    LOG.info("Cleaner set to delete logs older than {} seconds", logRetainSecs);
+    long unknownActiveSecs = conf.getLong(
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS,
+        YarnConfiguration.
+            TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_UNKNOWN_ACTIVE_SECONDS_DEFAULT
+    );
+    unknownActiveMillis = unknownActiveSecs * 1000;
+    LOG.info("Unknown apps will be treated as complete after {} seconds",
+        unknownActiveSecs);
+    appCacheMaxSize = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_APP_CACHE_SIZE_DEFAULT);
+    LOG.info("Application cache size is {}", appCacheMaxSize);
+    cachedLogs = Collections.synchronizedMap(
+      new LinkedHashMap<TimelineEntityGroupId, EntityCacheItem>(
+          appCacheMaxSize + 1, 0.75f, true) {
+          @Override
+          protected boolean removeEldestEntry(
+              Map.Entry<TimelineEntityGroupId, EntityCacheItem> eldest) {
+            if (super.size() > appCacheMaxSize) {
+              TimelineEntityGroupId groupId = eldest.getKey();
+              LOG.debug("Evicting {} due to space limitations", groupId);
+              EntityCacheItem cacheItem = eldest.getValue();
+              cacheItem.releaseCache(groupId);
+              if (cacheItem.getAppLogs().isDone()) {
+                appIdLogMap.remove(groupId.getApplicationId());
+              }
+              return true;
+            }
+            return false;
+          }
+      });
+    cacheIdPlugins = loadPlugIns(conf);
+    // Initialize yarn client for application status
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(conf);
+    super.serviceInit(conf);
+  }
+
+  private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
+      throws RuntimeException {
+    Collection<String> pluginNames = conf.getStringCollection(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
+    List<TimelineEntityGroupPlugin> pluginList
+        = new LinkedList<TimelineEntityGroupPlugin>();
+    for (final String name : pluginNames) {
+      LOG.debug("Trying to load plugin class {}", name);
+      TimelineEntityGroupPlugin cacheIdPlugin = null;
+      try {
+        Class<?> clazz = conf.getClassByName(name);
+        cacheIdPlugin =
+            (TimelineEntityGroupPlugin) ReflectionUtils.newInstance(
+                clazz, conf);
+      } catch (Exception e) {
+        LOG.warn("Error loading plugin " + name, e);
+      }
+
+      if (cacheIdPlugin == null) {
+        throw new RuntimeException("No class defined for " + name);
+      }
+      LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
+      pluginList.add(cacheIdPlugin);
+    }
+    return pluginList;
+  }
+
+  private TimelineStore createSummaryStore() {
+    return ReflectionUtils.newInstance(getConfig().getClass(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE,
+        LeveldbTimelineStore.class, TimelineStore.class), getConfig());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting {}", getName());
+    yarnClient.start();
+    summaryStore.start();
+
+    Configuration conf = getConfig();
+    aclManager = new TimelineACLsManager(conf);
+    aclManager.setTimelineStore(summaryStore);
+    summaryTdm = new TimelineDataManager(summaryStore, aclManager);
+    summaryTdm.init(conf);
+    summaryTdm.start();
+    activeRootPath = new Path(conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+    doneRootPath = new Path(conf.get(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
+    fs = activeRootPath.getFileSystem(conf);
+    if (!fs.exists(activeRootPath)) {
+      fs.mkdirs(activeRootPath);
+      fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
+    }
+    if (!fs.exists(doneRootPath)) {
+      fs.mkdirs(doneRootPath);
+      fs.setPermission(doneRootPath, DONE_DIR_PERMISSION);
+    }
+
+    objMapper = new ObjectMapper();
+    objMapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+    jsonFactory = new MappingJsonFactory(objMapper);
+    final long scanIntervalSecs = conf.getLong(
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
+    );
+    final long cleanerIntervalSecs = conf.getLong(
+        YarnConfiguration
+          .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS,
+        YarnConfiguration
+          .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_CLEANER_INTERVAL_SECONDS_DEFAULT
+    );
+    final int numThreads = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
+    LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
+    LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
+
+    executor = new ScheduledThreadPoolExecutor(numThreads,
+        new ThreadFactoryBuilder().setNameFormat("EntityLogPluginWorker #%d")
+            .build());
+    executor.scheduleAtFixedRate(new EntityLogScanner(), 0, scanIntervalSecs,
+        TimeUnit.SECONDS);
+    executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
+        cleanerIntervalSecs, TimeUnit.SECONDS);
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping {}", getName());
+    if (executor != null) {
+      executor.shutdown();
+      if (executor.isTerminating()) {
+        LOG.info("Waiting for executor to terminate");
+        boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
+        if (terminated) {
+          LOG.info("Executor terminated");
+        } else {
+          LOG.warn("Executor did not terminate");
+          executor.shutdownNow();
+        }
+      }
+    }
+    if (summaryTdm != null) {
+      summaryTdm.stop();
+    }
+    if (summaryStore != null) {
+      summaryStore.stop();
+    }
+    if (yarnClient != null) {
+      yarnClient.stop();
+    }
+    synchronized (cachedLogs) {
+      for (EntityCacheItem cacheItem : cachedLogs.values()) {
+        cacheItem.getStore().close();
+      }
+    }
+    super.serviceStop();
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  void scanActiveLogs() throws IOException {
+    RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
+    while (iter.hasNext()) {
+      FileStatus stat = iter.next();
+      ApplicationId appId = parseApplicationId(stat.getPath().getName());
+      if (appId != null) {
+        LOG.debug("scan logs for {} in {}", appId, stat.getPath());
+        AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
+        executor.execute(new ActiveLogParser(logs));
+      }
+    }
+  }
+
+  private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
+      Path appDirPath, AppState appState) {
+    AppLogs appLogs = new AppLogs(appId, appDirPath, appState);
+    AppLogs oldAppLogs = appIdLogMap.putIfAbsent(appId, appLogs);
+    if (oldAppLogs != null) {
+      appLogs = oldAppLogs;
+    }
+    return appLogs;
+  }
+
+  private AppLogs getAndSetActiveLog(ApplicationId appId, Path appDirPath) {
+    AppLogs appLogs = appIdLogMap.get(appId);
+    if (appLogs == null) {
+      appLogs = createAndPutAppLogsIfAbsent(appId, appDirPath, AppState.ACTIVE);
+    }
+    return appLogs;
+  }
+
+  // searches for the app logs and returns it if found else null
+  private AppLogs getAndSetAppLogs(ApplicationId applicationId)
+      throws IOException {
+    LOG.debug("Looking for app logs mapped for app id {}", applicationId);
+    AppLogs appLogs = appIdLogMap.get(applicationId);
+    if (appLogs == null) {
+      AppState appState = AppState.UNKNOWN;
+      Path appDirPath = getDoneAppPath(applicationId);
+      if (fs.exists(appDirPath)) {
+        appState = AppState.COMPLETED;
+      } else {
+        appDirPath = getActiveAppPath(applicationId);
+        if (fs.exists(appDirPath)) {
+          appState = AppState.ACTIVE;
+        }
+      }
+      if (appState != AppState.UNKNOWN) {
+        LOG.debug("Create and try to add new appLogs to appIdLogMap for {}",
+            applicationId);
+        appLogs = createAndPutAppLogsIfAbsent(
+            applicationId, appDirPath, appState);
+      }
+    }
+    return appLogs;
+  }
+
+  /**
+   * Main function for entity log cleaner. This method performs depth first
+   * search from a given dir path for all application log dirs. Once found, it
+   * will decide if the directory should be cleaned up and then clean them.
+   *
+   * @param dirpath the root directory the cleaner should start with. Note that
+   *                dirpath should be a directory that contains a set of
+   *                application log directories. The cleaner method will not
+   *                work if the given dirpath itself is an application log dir.
+   * @param fs
+   * @param retainMillis
+   * @throws IOException
+   */
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
+      throws IOException {
+    long now = Time.now();
+    // Depth first search from root directory for all application log dirs
+    RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
+    while (iter.hasNext()) {
+      FileStatus stat = iter.next();
+      if (stat.isDirectory()) {
+        // If current is an application log dir, decide if we need to remove it
+        // and remove if necessary.
+        // Otherwise, keep iterating into it.
+        ApplicationId appId = parseApplicationId(dirpath.getName());
+        if (appId != null) { // Application log dir
+          if (shouldCleanAppLogDir(dirpath, now, fs, retainMillis)) {
+            try {
+              LOG.info("Deleting {}", dirpath);
+              if (!fs.delete(dirpath, true)) {
+                LOG.error("Unable to remove " + dirpath);
+              }
+            } catch (IOException e) {
+              LOG.error("Unable to remove " + dirpath, e);
+            }
+          }
+        } else { // Keep cleaning inside
+          cleanLogs(stat.getPath(), fs, retainMillis);
+        }
+      }
+    }
+  }
+
+  private static boolean shouldCleanAppLogDir(Path appLogPath, long now,
+      FileSystem fs, long logRetainMillis) throws IOException {
+    RemoteIterator<FileStatus> iter = fs.listStatusIterator(appLogPath);
+    while (iter.hasNext()) {
+      FileStatus stat = iter.next();
+      if (now - stat.getModificationTime() <= logRetainMillis) {
+        // found a dir entry that is fresh enough to prevent
+        // cleaning this directory.
+        LOG.debug("{} not being cleaned due to {}", appLogPath, stat.getPath());
+        return false;
+      }
+      // Otherwise, keep searching files inside for directories.
+      if (stat.isDirectory()) {
+        if (!shouldCleanAppLogDir(stat.getPath(), now, fs, logRetainMillis)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  // converts the String to an ApplicationId or null if conversion failed
+  private static ApplicationId parseApplicationId(String appIdStr) {
+    ApplicationId appId = null;
+    if (appIdStr.startsWith(ApplicationId.appIdStrPrefix)) {
+      try {
+        appId = ConverterUtils.toApplicationId(appIdStr);
+      } catch (IllegalArgumentException e) {
+        appId = null;
+      }
+    }
+    return appId;
+  }
+
+  private Path getActiveAppPath(ApplicationId appId) {
+    return new Path(activeRootPath, appId.toString());
+  }
+
+  private Path getDoneAppPath(ApplicationId appId) {
+    // cut up the app ID into mod(1000) buckets
+    int appNum = appId.getId();
+    appNum /= 1000;
+    int bucket2 = appNum % 1000;
+    int bucket1 = appNum / 1000;
+    return new Path(doneRootPath,
+        String.format(APP_DONE_DIR_PREFIX_FORMAT, appId.getClusterTimestamp(),
+            bucket1, bucket2, appId.toString()));
+  }
+
+  // This method has to be synchronized to control traffic to RM
+  private static synchronized AppState getAppState(ApplicationId appId,
+      YarnClient yarnClient) throws IOException {
+    AppState appState = AppState.ACTIVE;
+    try {
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+      YarnApplicationState yarnState = report.getYarnApplicationState();
+      if (APP_FINAL_STATES.contains(yarnState)) {
+        appState = AppState.COMPLETED;
+      }
+    } catch (ApplicationNotFoundException e) {
+      appState = AppState.UNKNOWN;
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+    return appState;
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  enum AppState {
+    ACTIVE,
+    UNKNOWN,
+    COMPLETED
+  }
+
+  class AppLogs {
+    private ApplicationId appId;
+    private Path appDirPath;
+    private AppState appState;
+    private List<LogInfo> summaryLogs = new ArrayList<LogInfo>();
+    private List<LogInfo> detailLogs = new ArrayList<LogInfo>();
+
+    public AppLogs(ApplicationId appId, Path appPath, AppState state) {
+      this.appId = appId;
+      appDirPath = appPath;
+      appState = state;
+    }
+
+    public synchronized boolean isDone() {
+      return appState == AppState.COMPLETED;
+    }
+
+    public synchronized ApplicationId getAppId() {
+      return appId;
+    }
+
+    public synchronized Path getAppDirPath() {
+      return appDirPath;
+    }
+
+    synchronized List<LogInfo> getSummaryLogs() {
+      return summaryLogs;
+    }
+
+    synchronized List<LogInfo> getDetailLogs() {
+      return detailLogs;
+    }
+
+    public synchronized void parseSummaryLogs() throws IOException {
+      parseSummaryLogs(summaryTdm);
+    }
+
+    @InterfaceAudience.Private
+    @VisibleForTesting
+    synchronized void parseSummaryLogs(TimelineDataManager tdm)
+        throws IOException {
+      if (!isDone()) {
+        LOG.debug("Try to parse summary log for log {} in {}",
+            appId, appDirPath);
+        appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
+        long recentLogModTime = scanForLogs();
+        if (appState == AppState.UNKNOWN) {
+          if (Time.now() - recentLogModTime > unknownActiveMillis) {
+            LOG.info(
+                "{} state is UNKNOWN and logs are stale, assuming COMPLETED",
+                appId);
+            appState = AppState.COMPLETED;
+          }
+        }
+      }
+      List<LogInfo> removeList = new ArrayList<LogInfo>();
+      for (LogInfo log : summaryLogs) {
+        if (fs.exists(log.getPath(appDirPath))) {
+          log.parseForStore(tdm, appDirPath, isDone(), jsonFactory,
+              objMapper, fs);
+        } else {
+          // The log may have been removed, remove the log
+          removeList.add(log);
+          LOG.info("File {} no longer exists, remove it from log list",
+              log.getPath(appDirPath));
+        }
+      }
+      summaryLogs.removeAll(removeList);
+    }
+
+    // scans for new logs and returns the modification timestamp of the
+    // most recently modified log
+    @InterfaceAudience.Private
+    @VisibleForTesting
+    long scanForLogs() throws IOException {
+      LOG.debug("scanForLogs on {}", appDirPath);
+      long newestModTime = 0;
+      RemoteIterator<FileStatus> iterAttempt =
+          fs.listStatusIterator(appDirPath);
+      while (iterAttempt.hasNext()) {
+        FileStatus statAttempt = iterAttempt.next();
+        LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
+        if (!statAttempt.isDirectory()
+            || !statAttempt.getPath().getName()
+            .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+          LOG.debug("Scanner skips for unknown dir/file {}",
+              statAttempt.getPath());
+          continue;
+        }
+        String attemptDirName = statAttempt.getPath().getName();
+        RemoteIterator<FileStatus> iterCache
+            = fs.listStatusIterator(statAttempt.getPath());
+        while (iterCache.hasNext()) {
+          FileStatus statCache = iterCache.next();
+          if (!statCache.isFile()) {
+            continue;
+          }
+          String filename = statCache.getPath().getName();
+          // We should only update time for log files.
+          boolean shouldSetTime = true;
+          LOG.debug("scan for log file: {}", filename);
+          if (filename.startsWith(DOMAIN_LOG_PREFIX)) {
+            addSummaryLog(attemptDirName, filename, statCache.getOwner(), true);
+          } else if (filename.startsWith(SUMMARY_LOG_PREFIX)) {
+            addSummaryLog(attemptDirName, filename, statCache.getOwner(),
+                false);
+          } else if (filename.startsWith(ENTITY_LOG_PREFIX)) {
+            addDetailLog(attemptDirName, filename, statCache.getOwner());
+          } else {
+            shouldSetTime = false;
+          }
+          if (shouldSetTime) {
+            newestModTime
+              = Math.max(statCache.getModificationTime(), newestModTime);
+          }
+        }
+      }
+
+      // if there are no logs in the directory then use the modification
+      // time of the directory itself
+      if (newestModTime == 0) {
+        newestModTime = fs.getFileStatus(appDirPath).getModificationTime();
+      }
+
+      return newestModTime;
+    }
+
+    private void addSummaryLog(String attemptDirName,
+        String filename, String owner, boolean isDomainLog) {
+      for (LogInfo log : summaryLogs) {
+        if (log.getFilename().equals(filename)
+            && log.getAttemptDirName().equals(attemptDirName)) {
+          return;
+        }
+      }
+      LOG.debug("Incoming log {} not present in my summaryLogs list, add it",
+          filename);
+      LogInfo log;
+      if (isDomainLog) {
+        log = new DomainLogInfo(attemptDirName, filename, owner);
+      } else {
+        log = new EntityLogInfo(attemptDirName, filename, owner);
+      }
+      summaryLogs.add(log);
+    }
+
+    private void addDetailLog(String attemptDirName, String filename,
+        String owner) {
+      for (LogInfo log : detailLogs) {
+        if (log.getFilename().equals(filename)
+            && log.getAttemptDirName().equals(attemptDirName)) {
+          return;
+        }
+      }
+      detailLogs.add(new EntityLogInfo(attemptDirName, filename, owner));
+    }
+
+    public synchronized void moveToDone() throws IOException {
+      Path doneAppPath = getDoneAppPath(appId);
+      if (!doneAppPath.equals(appDirPath)) {
+        Path donePathParent = doneAppPath.getParent();
+        if (!fs.exists(donePathParent)) {
+          fs.mkdirs(donePathParent);
+        }
+        LOG.debug("Application {} is done, trying to move to done dir {}",
+            appId, doneAppPath);
+        if (!fs.rename(appDirPath, doneAppPath)) {
+          throw new IOException("Rename " + appDirPath + " to " + doneAppPath
+              + " failed");
+        } else {
+          LOG.info("Moved {} to {}", appDirPath, doneAppPath);
+        }
+        appDirPath = doneAppPath;
+      }
+    }
+  }
+
+  private class EntityLogScanner implements Runnable {
+    @Override
+    public void run() {
+      LOG.debug("Active scan starting");
+      try {
+        scanActiveLogs();
+      } catch (Exception e) {
+        LOG.error("Error scanning active files", e);
+      }
+      LOG.debug("Active scan complete");
+    }
+  }
+
+  private class ActiveLogParser implements Runnable {
+    private AppLogs appLogs;
+
+    public ActiveLogParser(AppLogs logs) {
+      appLogs = logs;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.debug("Begin parsing summary logs. ");
+        appLogs.parseSummaryLogs();
+        if (appLogs.isDone()) {
+          appLogs.moveToDone();
+          appIdLogMap.remove(appLogs.getAppId());
+        }
+        LOG.debug("End parsing summary logs. ");
+      } catch (Exception e) {
+        LOG.error("Error processing logs for " + appLogs.getAppId(), e);
+      }
+    }
+  }
+
+  private class EntityLogCleaner implements Runnable {
+    @Override
+    public void run() {
+      LOG.debug("Cleaner starting");
+      try {
+        cleanLogs(doneRootPath, fs, logRetainMillis);
+      } catch (Exception e) {
+        LOG.error("Error cleaning files", e);
+      }
+      LOG.debug("Cleaner finished");
+    }
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  void setFs(FileSystem incomingFs) {
+    this.fs = incomingFs;
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  void setCachedLogs(TimelineEntityGroupId groupId, EntityCacheItem cacheItem) {
+    cachedLogs.put(groupId, cacheItem);
+  }
+
+  private List<TimelineStore> getTimelineStoresFromCacheIds(
+      Set<TimelineEntityGroupId> groupIds, String entityType)
+      throws IOException {
+    List<TimelineStore> stores = new LinkedList<TimelineStore>();
+    // For now we just handle one store in a context. We return the first
+    // non-null storage for the group ids.
+    for (TimelineEntityGroupId groupId : groupIds) {
+      TimelineStore storeForId = getCachedStore(groupId);
+      if (storeForId != null) {
+        LOG.debug("Adding {} as a store for the query", storeForId.getName());
+        stores.add(storeForId);
+      }
+    }
+    if (stores.size() == 0) {
+      LOG.debug("Using summary store for {}", entityType);
+      stores.add(this.summaryStore);
+    }
+    return stores;
+  }
+
+  private List<TimelineStore> getTimelineStoresForRead(String entityId,
+      String entityType) throws IOException {
+    Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
+    for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
+      LOG.debug("Trying plugin {} for id {} and type {}",
+          cacheIdPlugin.getClass().getName(), entityId, entityType);
+      Set<TimelineEntityGroupId> idsFromPlugin
+          = cacheIdPlugin.getTimelineEntityGroupId(entityId, entityType);
+      if (idsFromPlugin == null) {
+        LOG.debug("Plugin returned null " + cacheIdPlugin.getClass().getName());
+      } else {
+        LOG.debug("Plugin returned ids: " + idsFromPlugin);
+      }
+
+      if (idsFromPlugin != null) {
+        groupIds.addAll(idsFromPlugin);
+        LOG.debug("plugin {} returns a non-null value on query",
+            cacheIdPlugin.getClass().getName());
+      }
+    }
+    return getTimelineStoresFromCacheIds(groupIds, entityType);
+  }
+
+  private List<TimelineStore> getTimelineStoresForRead(String entityType,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
+      throws IOException {
+    Set<TimelineEntityGroupId> groupIds = new HashSet<TimelineEntityGroupId>();
+    for (TimelineEntityGroupPlugin cacheIdPlugin : cacheIdPlugins) {
+      Set<TimelineEntityGroupId> idsFromPlugin =
+          cacheIdPlugin.getTimelineEntityGroupId(entityType, primaryFilter,
+              secondaryFilters);
+      if (idsFromPlugin != null) {
+        LOG.debug("plugin {} returns a non-null value on query {}",
+            cacheIdPlugin.getClass().getName(), idsFromPlugin);
+        groupIds.addAll(idsFromPlugin);
+      }
+    }
+    return getTimelineStoresFromCacheIds(groupIds, entityType);
+  }
+
+  // find a cached timeline store or null if it cannot be located
+  private TimelineStore getCachedStore(TimelineEntityGroupId groupId)
+      throws IOException {
+    EntityCacheItem cacheItem;
+    synchronized (this.cachedLogs) {
+      // Note that the content in the cache log storage may be stale.
+      cacheItem = this.cachedLogs.get(groupId);
+      if (cacheItem == null) {
+        LOG.debug("Set up new cache item for id {}", groupId);
+        cacheItem = new EntityCacheItem(getConfig(), fs);
+        AppLogs appLogs = getAndSetAppLogs(groupId.getApplicationId());
+        if (appLogs != null) {
+          LOG.debug("Set applogs {} for group id {}", appLogs, groupId);
+          cacheItem.setAppLogs(appLogs);
+          this.cachedLogs.put(groupId, cacheItem);
+        } else {
+          LOG.warn("AppLogs for groupId {} is set to null!", groupId);
+        }
+      }
+    }
+    TimelineStore store = null;
+    if (cacheItem.getAppLogs() != null) {
+      AppLogs appLogs = cacheItem.getAppLogs();
+      LOG.debug("try refresh cache {} {}", groupId, appLogs.getAppId());
+      store = cacheItem.refreshCache(groupId, aclManager, jsonFactory,
+          objMapper);
+    } else {
+      LOG.warn("AppLogs for group id {} is null", groupId);
+    }
+    return store;
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException {
+    LOG.debug("getEntities type={} primary={}", entityType, primaryFilter);
+    List<TimelineStore> stores = getTimelineStoresForRead(entityType,
+        primaryFilter, secondaryFilters);
+    TimelineEntities returnEntities = new TimelineEntities();
+    for (TimelineStore store : stores) {
+      LOG.debug("Try timeline store {} for the request", store.getName());
+      returnEntities.addEntities(
+          store.getEntities(entityType, limit, windowStart, windowEnd, fromId,
+              fromTs, primaryFilter, secondaryFilters, fieldsToRetrieve,
+              checkAcl).getEntities());
+    }
+    return returnEntities;
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    LOG.debug("getEntity type={} id={}", entityType, entityId);
+    List<TimelineStore> stores = getTimelineStoresForRead(entityId, entityType);
+    for (TimelineStore store : stores) {
+      LOG.debug("Try timeline store {}:{} for the request", store.getName(),
+          store.toString());
+      TimelineEntity e =
+          store.getEntity(entityId, entityType, fieldsToRetrieve);
+      if (e != null) {
+        return e;
+      }
+    }
+    LOG.debug("getEntity: Found nothing");
+    return null;
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventTypes) throws IOException {
+    LOG.debug("getEntityTimelines type={} ids={}", entityType, entityIds);
+    TimelineEvents returnEvents = new TimelineEvents();
+    for (String entityId : entityIds) {
+      LOG.debug("getEntityTimeline type={} id={}", entityType, entityId);
+      List<TimelineStore> stores
+          = getTimelineStoresForRead(entityId, entityType);
+      for (TimelineStore store : stores) {
+        LOG.debug("Try timeline store {}:{} for the request", store.getName(),
+            store.toString());
+        SortedSet<String> entityIdSet = new TreeSet<>();
+        entityIdSet.add(entityId);
+        TimelineEvents events =
+            store.getEntityTimelines(entityType, entityIdSet, limit,
+                windowStart, windowEnd, eventTypes);
+        returnEvents.addEvents(events.getAllEvents());
+      }
+    }
+    return returnEvents;
+  }
+
+  @Override
+  public TimelineDomain getDomain(String domainId) throws IOException {
+    return summaryStore.getDomain(domainId);
+  }
+
+  @Override
+  public TimelineDomains getDomains(String owner) throws IOException {
+    return summaryStore.getDomains(owner);
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities data) throws IOException {
+    return summaryStore.put(data);
+  }
+
+  @Override
+  public void put(TimelineDomain domain) throws IOException {
+    summaryStore.put(domain);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
new file mode 100644
index 0000000..4caed8d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.MappingIterator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+abstract class LogInfo {
+  public static final String ENTITY_FILE_NAME_DELIMITERS = "_.";
+
+  public String getAttemptDirName() {
+    return attemptDirName;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public void setOffset(long newOffset) {
+    this.offset = newOffset;
+  }
+
+  private String attemptDirName;
+  private String filename;
+  private String user;
+  private long offset = 0;
+
+  private static final Logger LOG = LoggerFactory.getLogger(LogInfo.class);
+
+  public LogInfo(String attemptDirName, String file, String owner) {
+    this.attemptDirName = attemptDirName;
+    filename = file;
+    user = owner;
+  }
+
+  public Path getPath(Path rootPath) {
+    Path attemptPath = new Path(rootPath, attemptDirName);
+    return new Path(attemptPath, filename);
+  }
+
+  public String getFilename() {
+    return filename;
+  }
+
+  public boolean matchesGroupId(TimelineEntityGroupId groupId) {
+    return matchesGroupId(groupId.toString());
+  }
+
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  boolean matchesGroupId(String groupId){
+    // Return true if the group id is a segment (separated by _, ., or end of
+    // string) of the file name.
+    int pos = filename.indexOf(groupId);
+    if (pos < 0) {
+      return false;
+    }
+    return filename.length() == pos + groupId.length()
+        || ENTITY_FILE_NAME_DELIMITERS.contains(String.valueOf(
+        filename.charAt(pos + groupId.length())
+    ));
+  }
+
+  public void parseForStore(TimelineDataManager tdm, Path appDirPath,
+      boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
+      FileSystem fs) throws IOException {
+    LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
+        attemptDirName);
+    Path logPath = getPath(appDirPath);
+    if (fs.exists(logPath)) {
+      long startTime = Time.monotonicNow();
+      try {
+        LOG.debug("Parsing {} at offset {}", logPath, offset);
+        long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
+            objMapper, fs);
+        LOG.info("Parsed {} entities from {} in {} msec",
+            count, logPath, Time.monotonicNow() - startTime);
+      } catch (RuntimeException e) {
+        if (e.getCause() instanceof JsonParseException) {
+          // If AppLogs cannot parse this log, it may be corrupted
+          LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
+        }
+      }
+    } else {
+      LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
+    }
+  }
+
+  private long parsePath(TimelineDataManager tdm, Path logPath,
+      boolean appCompleted, JsonFactory jsonFactory, ObjectMapper objMapper,
+      FileSystem fs) throws IOException {
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(user);
+    FSDataInputStream in = fs.open(logPath);
+    JsonParser parser = null;
+    try {
+      in.seek(offset);
+      try {
+        parser = jsonFactory.createJsonParser(in);
+        parser.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+      } catch (IOException e) {
+        // if app hasn't completed then there may be errors due to the
+        // incomplete file which are treated as EOF until app completes
+        if (appCompleted) {
+          throw e;
+        } else {
+          LOG.debug("Exception in parse path: {}", e.getMessage());
+          return 0;
+        }
+      }
+
+      return doParse(tdm, parser, objMapper, ugi, appCompleted);
+    } finally {
+      IOUtils.closeStream(parser);
+      IOUtils.closeStream(in);
+    }
+  }
+
+  protected abstract long doParse(TimelineDataManager tdm, JsonParser parser,
+      ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
+      throws IOException;
+}
+
+class EntityLogInfo extends LogInfo {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntityGroupFSTimelineStore.class);
+
+  public EntityLogInfo(String attemptId,
+      String file, String owner) {
+    super(attemptId, file, owner);
+  }
+
+  @Override
+  protected long doParse(TimelineDataManager tdm, JsonParser parser,
+      ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
+      throws IOException {
+    long count = 0;
+    TimelineEntities entities = new TimelineEntities();
+    ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
+    long bytesParsed;
+    long bytesParsedLastBatch = 0;
+    boolean postError = false;
+    try {
+      MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
+          TimelineEntity.class);
+
+      while (iter.hasNext()) {
+        TimelineEntity entity = iter.next();
+        String etype = entity.getEntityType();
+        String eid = entity.getEntityId();
+        LOG.trace("Read entity {}", etype);
+        ++count;
+        bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
+        LOG.trace("Parser now at offset {}", bytesParsed);
+
+        try {
+          LOG.debug("Adding {}({}) to store", eid, etype);
+          entityList.add(entity);
+          entities.setEntities(entityList);
+          TimelinePutResponse response = tdm.postEntities(entities, ugi);
+          for (TimelinePutResponse.TimelinePutError e
+              : response.getErrors()) {
+            LOG.warn("Error putting entity: {} ({}): {}",
+                e.getEntityId(), e.getEntityType(), e.getErrorCode());
+          }
+          setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
+          bytesParsedLastBatch = bytesParsed;
+          entityList.clear();
+        } catch (YarnException e) {
+          postError = true;
+          throw new IOException("Error posting entities", e);
+        } catch (IOException e) {
+          postError = true;
+          throw new IOException("Error posting entities", e);
+        }
+      }
+    } catch (IOException e) {
+      // if app hasn't completed then there may be errors due to the
+      // incomplete file which are treated as EOF until app completes
+      if (appCompleted || postError) {
+        throw e;
+      }
+    } catch (RuntimeException e) {
+      if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
+        throw e;
+      }
+    }
+    return count;
+  }
+}
+
+class DomainLogInfo extends LogInfo {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      EntityGroupFSTimelineStore.class);
+
+  public DomainLogInfo(String attemptDirName, String file,
+      String owner) {
+    super(attemptDirName, file, owner);
+  }
+
+  protected long doParse(TimelineDataManager tdm, JsonParser parser,
+      ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
+      throws IOException {
+    long count = 0;
+    long bytesParsed;
+    long bytesParsedLastBatch = 0;
+    boolean putError = false;
+    try {
+      MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
+          TimelineDomain.class);
+
+      while (iter.hasNext()) {
+        TimelineDomain domain = iter.next();
+        domain.setOwner(ugi.getShortUserName());
+        LOG.trace("Read domain {}", domain.getId());
+        ++count;
+        bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
+        LOG.trace("Parser now at offset {}", bytesParsed);
+
+        try {
+          tdm.putDomain(domain, ugi);
+          setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
+          bytesParsedLastBatch = bytesParsed;
+        } catch (YarnException e) {
+          putError = true;
+          throw new IOException("Error posting domain", e);
+        } catch (IOException e) {
+          putError = true;
+          throw new IOException("Error posting domain", e);
+        }
+      }
+    } catch (IOException e) {
+      // if app hasn't completed then there may be errors due to the
+      // incomplete file which are treated as EOF until app completes
+      if (appCompleted || putError) {
+        throw e;
+      }
+    } catch (RuntimeException e) {
+      if (appCompleted || !(e.getCause() instanceof JsonParseException)) {
+        throw e;
+      }
+    }
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java
new file mode 100644
index 0000000..9cdbf5f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityGroupPlugin.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Plugin to map a requested query ( or an Entity/set of Entities ) to a CacheID.
+ * The Cache ID is an identifier to the data set that needs to be queried to
+ * serve the response for the query.
+ */
+public abstract class TimelineEntityGroupPlugin {
+
+  /**
+   * Get the {@link TimelineEntityGroupId}s for the data sets that need to be
+   * scanned to serve the query.
+   *
+   * @param entityType Entity Type being queried
+   * @param primaryFilter Primary filter being applied
+   * @param secondaryFilters Secondary filters being applied in the query
+   * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
+   */
+  public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
+      String entityType, NameValuePair primaryFilter,
+      Collection<NameValuePair> secondaryFilters);
+
+  /**
+   * Get the {@link TimelineEntityGroupId}s for the data sets that need to be
+   * scanned to serve the query.
+   *
+   * @param entityType Entity Type being queried
+   * @param entityId Entity Id being requested
+   * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
+   */
+  public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
+      String entityId,
+      String entityType);
+
+
+  /**
+   * Get the {@link TimelineEntityGroupId}s for the data sets that need to be
+   * scanned to serve the query.
+   *
+   * @param entityType Entity Type being queried
+   * @param entityIds Entity Ids being requested
+   * @param eventTypes Event Types being requested
+   * @return {@link org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId}
+   */
+  public abstract Set<TimelineEntityGroupId> getTimelineEntityGroupId(
+      String entityType, SortedSet<String> entityIds,
+      Set<String> eventTypes);
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a04f29ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
new file mode 100644
index 0000000..6f61bba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file