You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/03/13 23:45:31 UTC

[hadoop] branch trunk updated: YARN-9016 DocumentStore as a backend for ATSv2. Contributed by Sushil Ks.

This is an automated email from the ASF dual-hosted git repository.

vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f235a94  YARN-9016 DocumentStore as a backend for ATSv2. Contributed by Sushil Ks.
f235a94 is described below

commit f235a942d5b7cab86f0cb4f5ba285f07cd939a40
Author: Vrushali C <vr...@apache.org>
AuthorDate: Wed Mar 13 16:45:23 2019 -0700

    YARN-9016 DocumentStore as a backend for ATSv2. Contributed by Sushil Ks.
---
 .../main/resources/assemblies/hadoop-yarn-dist.xml |   5 +
 .../pom.xml                                        | 150 +++++++
 .../DocumentStoreCollectionCreator.java            |  66 +++
 .../DocumentStoreTimelineReaderImpl.java           | 121 +++++
 .../DocumentStoreTimelineWriterImpl.java           | 285 ++++++++++++
 .../documentstore/DocumentStoreUtils.java          | 489 +++++++++++++++++++++
 .../documentstore/collection/CollectionType.java   |  44 ++
 .../document/NoDocumentFoundException.java         |  39 ++
 .../collection/document/TimelineDocument.java      |  37 ++
 .../document/entity/TimelineEntityDocument.java    | 242 ++++++++++
 .../document/entity/TimelineEventSubDoc.java       |  96 ++++
 .../document/entity/TimelineMetricSubDoc.java      | 167 +++++++
 .../collection/document/entity/package-info.java   |  30 ++
 .../flowactivity/FlowActivityDocument.java         | 131 ++++++
 .../document/flowactivity/FlowActivitySubDoc.java  |  73 +++
 .../document/flowactivity/package-info.java        |  29 ++
 .../document/flowrun/FlowRunDocument.java          | 239 ++++++++++
 .../collection/document/flowrun/package-info.java  |  29 ++
 .../collection/document/package-info.java          |  30 ++
 .../documentstore/collection/package-info.java     |  30 ++
 .../documentstore/lib/DocumentStoreFactory.java    |  96 ++++
 .../lib/DocumentStoreNotSupportedException.java    |  35 ++
 .../documentstore/lib/DocumentStoreVendor.java     |  39 ++
 .../documentstore/lib/package-info.java            |  30 ++
 .../documentstore/package-info.java                |  29 ++
 .../documentstore/reader/DocumentStoreReader.java  |  45 ++
 .../reader/TimelineCollectionReader.java           | 220 +++++++++
 .../cosmosdb/CosmosDBDocumentStoreReader.java      | 232 ++++++++++
 .../reader/cosmosdb/package-info.java              |  28 ++
 .../documentstore/reader/package-info.java         |  29 ++
 .../documentstore/writer/DocumentStoreWriter.java  |  35 ++
 .../writer/TimelineCollectionWriter.java           | 146 ++++++
 .../cosmosdb/CosmosDBDocumentStoreWriter.java      | 235 ++++++++++
 .../writer/cosmosdb/package-info.java              |  28 ++
 .../documentstore/writer/package-info.java         |  29 ++
 .../documentstore/DocumentStoreTestUtils.java      |  81 ++++
 .../timelineservice/documentstore/JsonUtils.java   |  59 +++
 .../TestDocumentStoreCollectionCreator.java        |  64 +++
 .../TestDocumentStoreTimelineReaderImpl.java       | 407 +++++++++++++++++
 .../TestDocumentStoreTimelineWriterImpl.java       |  90 ++++
 .../collection/TestDocumentOperations.java         | 177 ++++++++
 .../reader/DummyDocumentStoreReader.java           | 118 +++++
 .../writer/DummyDocumentStoreWriter.java           |  46 ++
 .../test/resources/documents/flowactivity-doc.json |  20 +
 .../src/test/resources/documents/flowrun-doc.json  | 126 ++++++
 .../documents/test-timeline-entities-doc.json      | 185 ++++++++
 .../test/resources/documents/timeline-app-doc.json | 203 +++++++++
 .../resources/documents/timeline-entities.json     | 119 +++++
 .../PerNodeAggTimelineCollectorMetrics.java        |   2 +-
 .../hadoop-yarn/hadoop-yarn-server/pom.xml         |   1 +
 50 files changed, 5285 insertions(+), 1 deletion(-)

diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
index a5c3c0e..4da4ac5 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
@@ -224,6 +224,10 @@
       <directory>hadoop-yarn/hadoop-yarn-csi/target/lib</directory>
       <outputDirectory>share/hadoop/${hadoop.component}/csi/lib</outputDirectory>
     </fileSet>
+    <fileSet>
+      <directory>hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/target/lib</directory>
+      <outputDirectory>share/hadoop/${hadoop.component}/timelineservice/lib</outputDirectory>
+    </fileSet>
   </fileSets>
   <moduleSets>
     <moduleSet>
@@ -231,6 +235,7 @@
         <include>org.apache.hadoop:hadoop-yarn-server-timelineservice</include>
         <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-client</include>
         <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
+        <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-documentstore</include>
       </includes>
       <binaries>
         <outputDirectory>share/hadoop/${hadoop.component}/timelineservice</outputDirectory>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
new file mode 100644
index 0000000..be668e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
@@ -0,0 +1,150 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>hadoop-yarn-server</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>3.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hadoop-yarn-server-timelineservice-documentstore</artifactId>
+  <name>Apache Hadoop YARN TimelineService DocumentStore</name>
+
+  <properties>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+    <azure.documentdb.version>1.16.2</azure.documentdb.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.microsoft.azure</groupId>
+      <artifactId>azure-documentdb</artifactId>
+      <version>${azure.documentdb.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>2.8.9</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito2</artifactId>
+      <version>1.7.1</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mockito</groupId>
+          <artifactId>mockito-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <version>1.7.1</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.mockito</groupId>
+          <artifactId>mockito-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <!-- Apache RAT -->
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/documents/flowactivity-doc.json</exclude>
+            <exclude>src/test/resources/documents/flowrun-doc.json</exclude>
+            <exclude>src/test/resources/documents/test-timeline-entities-doc.json</exclude>
+            <exclude>src/test/resources/documents/timeline-app-doc.json</exclude>
+            <exclude>src/test/resources/documents/timeline-entities.json</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <additionalDependencies>
+            <additionnalDependency>
+              <groupId>junit</groupId>
+              <artifactId>junit</artifactId>
+              <version>4.11</version>
+            </additionnalDependency>
+          </additionalDependencies>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <includeScope>runtime</includeScope>
+              <excludeGroupIds>org.slf4j,org.apache.hadoop,com.github.stephenc.findbugs</excludeGroupIds>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreCollectionCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreCollectionCreator.java
new file mode 100755
index 0000000..15d1e3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreCollectionCreator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.SchemaCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This creates the Collection for a {@link DocumentStoreVendor} backend
+ * configured for storing  application timeline information.
+ */
+public class DocumentStoreCollectionCreator implements SchemaCreator {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DocumentStoreCollectionCreator.class);
+
+
+  @Override
+  public void createTimelineSchema(String[] args) {
+    try {
+
+      Configuration conf = new YarnConfiguration();
+
+      LOG.info("Creating database and collections for DocumentStore : {}",
+          DocumentStoreUtils.getStoreVendor(conf));
+
+      try(DocumentStoreWriter documentStoreWriter = DocumentStoreFactory
+          .createDocumentStoreWriter(conf)) {
+        documentStoreWriter.createDatabase();
+        documentStoreWriter.createCollection(
+            CollectionType.APPLICATION.getCollectionName());
+        documentStoreWriter.createCollection(
+            CollectionType.ENTITY.getCollectionName());
+        documentStoreWriter.createCollection(
+            CollectionType.FLOW_ACTIVITY.getCollectionName());
+        documentStoreWriter.createCollection(
+            CollectionType.FLOW_RUN.getCollectionName());
+      }
+    } catch (Exception e) {
+      LOG.error("Error while creating Timeline Collections", e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java
new file mode 100755
index 0000000..2159132
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.TimelineCollectionReader;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is a generic document store timeline reader for reading the timeline
+ * entity information. Based on the {@link DocumentStoreVendor} that is
+ * configured, the  documents are read from that backend.
+ */
+public class DocumentStoreTimelineReaderImpl
+    extends AbstractService implements TimelineReader {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DocumentStoreTimelineReaderImpl.class);
+
+  private TimelineCollectionReader collectionReader;
+
+  public DocumentStoreTimelineReaderImpl() {
+    super(DocumentStoreTimelineReaderImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    DocumentStoreVendor storeType = DocumentStoreUtils.getStoreVendor(conf);
+    LOG.info("Initializing Document Store Reader for : " + storeType);
+    collectionReader = new TimelineCollectionReader(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+    LOG.info("Stopping Document Timeline Store reader...");
+    collectionReader.close();
+  }
+
+  public TimelineEntity getEntity(TimelineReaderContext context,
+      TimelineDataToRetrieve dataToRetrieve) throws IOException {
+    TimelineEntityDocument timelineEntityDoc;
+    switch (TimelineEntityType.valueOf(context.getEntityType())) {
+    case YARN_FLOW_ACTIVITY:
+    case YARN_FLOW_RUN:
+      timelineEntityDoc =
+          collectionReader.readDocument(context);
+      return DocumentStoreUtils.createEntityToBeReturned(
+          timelineEntityDoc, dataToRetrieve.getConfsToRetrieve(),
+          dataToRetrieve.getMetricsToRetrieve());
+    default:
+      timelineEntityDoc =
+          collectionReader.readDocument(context);
+    }
+    return DocumentStoreUtils.createEntityToBeReturned(
+        timelineEntityDoc, dataToRetrieve);
+  }
+
+  public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+      TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+      throws IOException {
+    List<TimelineEntityDocument> entityDocs =
+        collectionReader.readDocuments(context, filters.getLimit());
+
+    return applyFilters(filters, dataToRetrieve, entityDocs);
+  }
+
+  public Set<String> getEntityTypes(TimelineReaderContext context) {
+    return collectionReader.fetchEntityTypes(context);
+  }
+
+  // for honoring all filters from {@link TimelineEntityFilters}
+  private Set<TimelineEntity> applyFilters(TimelineEntityFilters filters,
+      TimelineDataToRetrieve dataToRetrieve,
+      List<TimelineEntityDocument> entityDocs) throws IOException {
+    Set<TimelineEntity> timelineEntities = new HashSet<>();
+    for (TimelineEntityDocument entityDoc : entityDocs) {
+      final TimelineEntity timelineEntity = entityDoc.fetchTimelineEntity();
+
+      if (DocumentStoreUtils.isFilterNotMatching(filters, timelineEntity)) {
+        continue;
+      }
+
+      TimelineEntity entityToBeReturned = DocumentStoreUtils
+          .createEntityToBeReturned(entityDoc, dataToRetrieve);
+      timelineEntities.add(entityToBeReturned);
+    }
+    return timelineEntities;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java
new file mode 100755
index 0000000..572d888
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.*;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.TimelineCollectionWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * This is a generic document store timeline writer for storing the timeline
+ * entity information. Based on the {@link DocumentStoreVendor} that is
+ * configured, the documents are written to that backend.
+ */
+public class DocumentStoreTimelineWriterImpl extends AbstractService
+    implements TimelineWriter {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DocumentStoreTimelineWriterImpl.class);
+  private static final String DOC_ID_DELIMITER = "!";
+
+  private DocumentStoreVendor storeType;
+  private TimelineCollectionWriter<TimelineEntityDocument> appCollWriter;
+  private TimelineCollectionWriter<TimelineEntityDocument>
+      entityCollWriter;
+  private TimelineCollectionWriter<FlowActivityDocument> flowActivityCollWriter;
+  private TimelineCollectionWriter<FlowRunDocument> flowRunCollWriter;
+
+
+  public DocumentStoreTimelineWriterImpl() {
+    super(DocumentStoreTimelineWriterImpl.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    storeType = DocumentStoreUtils.getStoreVendor(conf);
+    LOG.info("Initializing Document Store Writer for : " + storeType);
+    super.serviceInit(conf);
+
+    this.appCollWriter = new TimelineCollectionWriter<>(
+        CollectionType.APPLICATION, conf);
+    this.entityCollWriter = new TimelineCollectionWriter<>(
+        CollectionType.ENTITY, conf);
+    this.flowActivityCollWriter = new TimelineCollectionWriter<>(
+        CollectionType.FLOW_ACTIVITY, conf);
+    this.flowRunCollWriter = new TimelineCollectionWriter<>(
+        CollectionType.FLOW_RUN, conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    appCollWriter.close();
+    entityCollWriter.close();
+    flowActivityCollWriter.close();
+    flowRunCollWriter.close();
+  }
+
+  @Override
+  public TimelineWriteResponse write(TimelineCollectorContext
+      context, TimelineEntities data, UserGroupInformation callerUgi) {
+    LOG.debug("Writing Timeline Entity for appID : {}", context.getAppId());
+    TimelineWriteResponse putStatus = new TimelineWriteResponse();
+    String subApplicationUser = callerUgi.getShortUserName();
+
+    //Avoiding NPE for document id
+    if (DocumentStoreUtils.isNullOrEmpty(context.getFlowName(),
+        context.getAppId(), context.getClusterId(), context.getUserId())) {
+      LOG.warn("Found NULL for one of: flowName={} appId={} " +
+          "userId={} clusterId={} . Not proceeding on writing to store : " +
+          storeType);
+      return putStatus;
+    }
+
+    for (TimelineEntity timelineEntity : data.getEntities()) {
+      // a set can have at most 1 null
+      if(timelineEntity == null) {
+        continue;
+      }
+
+      TimelineEntityDocument entityDocument;
+      //If the entity is application, it will be stored in Application
+      // Collection
+      if (ApplicationEntity.isApplicationEntity(timelineEntity)) {
+        entityDocument = createTimelineEntityDoc(context, subApplicationUser,
+            timelineEntity, true);
+        // if it's an application entity, store metrics for aggregation
+        FlowRunDocument flowRunDoc = createFlowRunDoc(context,
+            timelineEntity.getMetrics());
+        // fetch flow activity if App is created or finished
+        FlowActivityDocument flowActivityDoc = getFlowActivityDoc(context,
+            timelineEntity, flowRunDoc, entityDocument);
+        writeApplicationDoc(entityDocument);
+        writeFlowRunDoc(flowRunDoc);
+        if(flowActivityDoc != null) {
+          storeFlowActivityDoc(flowActivityDoc);
+        }
+      } else {
+        entityDocument = createTimelineEntityDoc(context, subApplicationUser,
+            timelineEntity, false);
+        appendSubAppUserIfExists(context, subApplicationUser);
+        // The entity will be stored in Entity Collection
+        entityDocument.setCreatedTime(fetchEntityCreationTime(timelineEntity));
+        writeEntityDoc(entityDocument);
+      }
+    }
+    return putStatus;
+  }
+
+  @Override
+  public TimelineWriteResponse write(TimelineCollectorContext context,
+      TimelineDomain domain) throws IOException {
+    return null;
+  }
+
+  private void appendSubAppUserIfExists(TimelineCollectorContext context,
+      String subApplicationUser) {
+    String userId = context.getUserId();
+    if (!userId.equals(subApplicationUser) &&
+        !userId.contains(subApplicationUser)) {
+      userId = userId.concat(DOC_ID_DELIMITER).concat(subApplicationUser);
+      context.setUserId(userId);
+    }
+  }
+
+  private TimelineEntityDocument createTimelineEntityDoc(
+      TimelineCollectorContext context, String subApplicationUser,
+      TimelineEntity timelineEntity, boolean isAppEntity) {
+    TimelineEntityDocument entityDocument =
+        new TimelineEntityDocument(timelineEntity);
+    entityDocument.setContext(context);
+    entityDocument.setFlowVersion(context.getFlowVersion());
+    entityDocument.setSubApplicationUser(subApplicationUser);
+    if (isAppEntity) {
+      entityDocument.setId(DocumentStoreUtils.constructTimelineEntityDocId(
+          context, timelineEntity.getType()));
+    } else {
+      entityDocument.setId(DocumentStoreUtils.constructTimelineEntityDocId(
+          context, timelineEntity.getType(), timelineEntity.getId()));
+    }
+    return entityDocument;
+  }
+
+  private FlowRunDocument createFlowRunDoc(TimelineCollectorContext context,
+      Set<TimelineMetric> metrics) {
+    FlowRunDocument flowRunDoc = new FlowRunDocument(context, metrics);
+    flowRunDoc.setFlowVersion(context.getFlowVersion());
+    flowRunDoc.setId(DocumentStoreUtils.constructFlowRunDocId(context));
+    return flowRunDoc;
+  }
+
+  private long fetchEntityCreationTime(TimelineEntity timelineEntity) {
+    TimelineEvent event;
+    switch (TimelineEntityType.valueOf(timelineEntity.getType())) {
+    case YARN_CONTAINER:
+      event = DocumentStoreUtils.fetchEvent(
+          timelineEntity, ContainerMetricsConstants.CREATED_EVENT_TYPE);
+      if (event != null) {
+        return event.getTimestamp();
+      }
+      break;
+    case YARN_APPLICATION_ATTEMPT:
+      event = DocumentStoreUtils.fetchEvent(
+          timelineEntity, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+      if (event != null) {
+        return event.getTimestamp();
+      }
+      break;
+    default:
+      //NO Op
+    }
+    if (timelineEntity.getCreatedTime() == null) {
+      return 0;
+    }
+    return timelineEntity.getCreatedTime();
+  }
+
+  private FlowActivityDocument getFlowActivityDoc(
+      TimelineCollectorContext context,
+      TimelineEntity timelineEntity, FlowRunDocument flowRunDoc,
+      TimelineEntityDocument entityDocument) {
+    FlowActivityDocument flowActivityDoc = null;
+    // check if the application is created
+    TimelineEvent event = DocumentStoreUtils.fetchEvent(
+        timelineEntity, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    if (event != null) {
+      entityDocument.setCreatedTime(event.getTimestamp());
+      flowRunDoc.setMinStartTime(event.getTimestamp());
+      flowActivityDoc = createFlowActivityDoc(context, context.getFlowName(),
+          context.getFlowVersion(), context.getFlowRunId(), event);
+    }
+
+    // if application has finished, store it's finish time
+    event = DocumentStoreUtils.fetchEvent(timelineEntity,
+        ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    if (event != null) {
+      flowRunDoc.setMaxEndTime(event.getTimestamp());
+
+      // this check is to handle in case both create and finish event exist
+      // under the single list of events for an TimelineEntity
+      if (flowActivityDoc == null) {
+        flowActivityDoc = createFlowActivityDoc(context, context.getFlowName(),
+            context.getFlowVersion(), context.getFlowRunId(), event);
+      }
+    }
+    return flowActivityDoc;
+  }
+
+  private FlowActivityDocument createFlowActivityDoc(
+      TimelineCollectorContext context, String flowName, String flowVersion,
+      long flowRunId, TimelineEvent event) {
+    FlowActivityDocument flowActivityDoc = new FlowActivityDocument(flowName,
+        flowVersion, flowRunId);
+    flowActivityDoc.setDayTimestamp(DocumentStoreUtils.getTopOfTheDayTimestamp(
+        event.getTimestamp()));
+    flowActivityDoc.setFlowName(flowName);
+    flowActivityDoc.setUser(context.getUserId());
+    flowActivityDoc.setId(DocumentStoreUtils.constructFlowActivityDocId(
+        context, event.getTimestamp()));
+    return flowActivityDoc;
+  }
+
+  private void writeFlowRunDoc(FlowRunDocument flowRunDoc) {
+    flowRunCollWriter.writeDocument(flowRunDoc);
+  }
+
+  private void storeFlowActivityDoc(FlowActivityDocument flowActivityDoc) {
+    flowActivityCollWriter.writeDocument(flowActivityDoc);
+  }
+
+  private void writeEntityDoc(TimelineEntityDocument entityDocument) {
+    entityCollWriter.writeDocument(entityDocument);
+  }
+
+  private void writeApplicationDoc(TimelineEntityDocument entityDocument) {
+    appCollWriter.writeDocument(entityDocument);
+  }
+
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) {
+    return null;
+  }
+
+  @Override
+  public void flush() {
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
new file mode 100755
index 0000000..4b14d47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.DocumentClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEventSubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * This class consists of all the utils required for reading or writing
+ * documents for a {@link DocumentStoreVendor}.
+ */
+public final class DocumentStoreUtils {
+
+  private DocumentStoreUtils(){}
+
+  /** milliseconds in one day. */
+  private static final long MILLIS_ONE_DAY = 86400000L;
+
+  private static final String TIMELINE_STORE_TYPE =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "document-store-type";
+  static final String TIMELINE_SERVICE_COSMOSDB_ENDPOINT =
+      "yarn.timeline-service.document-store.cosmos-db.endpoint";
+  static final String TIMELINE_SERVICE_COSMOSDB_MASTER_KEY =
+      "yarn.timeline-service.document-store.cosmos-db.masterkey";
+  static final String TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME =
+      "yarn.timeline-service.document-store.db-name";
+  private static final String
+      DEFAULT_TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME = "timeline_service";
+
+  /**
+   * Checks whether the cosmosdb conf are set properly in yarn-site.xml conf.
+   * @param conf
+   *             related to yarn
+   * @throws YarnException if required config properties are missing
+   */
+  public static void validateCosmosDBConf(Configuration conf)
+      throws YarnException {
+    if (conf == null) {
+      throw new NullPointerException("Configuration cannot be null");
+    }
+    if (isNullOrEmpty(conf.get(TIMELINE_SERVICE_COSMOSDB_ENDPOINT),
+        conf.get(TIMELINE_SERVICE_COSMOSDB_MASTER_KEY))) {
+      throw new YarnException("One or more CosmosDB configuration property is" +
+          " missing in yarn-site.xml");
+    }
+  }
+
+  /**
+   * Retrieves {@link DocumentStoreVendor} configured.
+   * @param conf
+   *             related to yarn
+   * @return Returns the {@link DocumentStoreVendor} that is configured, else
+   *         uses {@link DocumentStoreVendor#COSMOS_DB} as default
+   */
+  public static DocumentStoreVendor getStoreVendor(Configuration conf) {
+    return DocumentStoreVendor.getStoreType(conf.get(TIMELINE_STORE_TYPE,
+        DocumentStoreVendor.COSMOS_DB.name()));
+  }
+
+  /**
+   * Retrieves a {@link TimelineEvent} from {@link TimelineEntity#events}.
+   * @param timelineEntity
+   *                      from which the set of events are examined.
+   * @param eventType
+   *                that has to be checked.
+   * @return {@link TimelineEvent} if found else null
+   */
+  public static TimelineEvent fetchEvent(TimelineEntity timelineEntity,
+      String eventType) {
+    for (TimelineEvent event : timelineEntity.getEvents()) {
+      if (event.getId().equals(eventType)) {
+        return event;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Checks if the string is null or empty.
+   * @param values
+   *             array of string to be checked
+   * @return false if any of the string is null or empty else true
+   */
+  public static boolean isNullOrEmpty(String...values) {
+    for (String value : values) {
+      if (value == null || value.isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Creates CosmosDB Document Client.
+   * @param conf
+   *          to retrieve cosmos db endpoint and key
+   * @return async document client for CosmosDB
+   */
+  public static DocumentClient createCosmosDBClient(Configuration conf){
+    return new DocumentClient(DocumentStoreUtils.getCosmosDBEndpoint(conf),
+        DocumentStoreUtils.getCosmosDBMasterKey(conf),
+        ConnectionPolicy.GetDefault(), ConsistencyLevel.Session);
+  }
+
+  /**
+   * Returns the timestamp of the day's start (which is midnight 00:00:00 AM)
+   * for a given input timestamp.
+   *
+   * @param timeStamp Timestamp.
+   * @return timestamp of that day's beginning (midnight)
+   */
+  public static long getTopOfTheDayTimestamp(long timeStamp) {
+    return timeStamp - (timeStamp % MILLIS_ONE_DAY);
+  }
+
+  /**
+   * Creates a composite key for storing {@link TimelineEntityDocument}.
+   * @param collectorContext
+   *              of the timeline writer
+   * @param type
+   *            of the entity
+   * @return composite key delimited with !
+   */
+  public static String constructTimelineEntityDocId(TimelineCollectorContext
+      collectorContext, String type) {
+    return String.format("%s!%s!%s!%d!%s!%s",
+        collectorContext.getClusterId(), collectorContext.getUserId(),
+        collectorContext.getFlowName(), collectorContext.getFlowRunId(),
+        collectorContext.getAppId(), type);
+  }
+
+  /**
+   * Creates a composite key for storing {@link TimelineEntityDocument}.
+   * @param collectorContext
+   *              of the timeline writer
+   * @param type
+   *            of the entity
+   * @param id
+   *            of the entity
+   * @return composite key delimited with !
+   */
+  public static String constructTimelineEntityDocId(TimelineCollectorContext
+      collectorContext, String type, String id) {
+    return String.format("%s!%s!%s!%d!%s!%s!%s",
+        collectorContext.getClusterId(), collectorContext.getUserId(),
+        collectorContext.getFlowName(), collectorContext.getFlowRunId(),
+        collectorContext.getAppId(), type, id);
+  }
+
+  /**
+   * Creates a composite key for storing {@link FlowRunDocument}.
+   * @param collectorContext
+   *              of the timeline writer
+   * @return composite key delimited with !
+   */
+  public static String constructFlowRunDocId(TimelineCollectorContext
+      collectorContext) {
+    return String.format("%s!%s!%s!%s", collectorContext.getClusterId(),
+        collectorContext.getUserId(), collectorContext.getFlowName(),
+        collectorContext.getFlowRunId());
+  }
+
+  /**
+   * Creates a composite key for storing {@link FlowActivityDocument}.
+   * @param collectorContext
+   *              of the timeline writer
+   * @param eventTimestamp
+   *              of the timeline entity
+   * @return composite key delimited with !
+   */
+  public static String constructFlowActivityDocId(TimelineCollectorContext
+      collectorContext, long eventTimestamp) {
+    return String.format("%s!%s!%s!%s", collectorContext.getClusterId(),
+        getTopOfTheDayTimestamp(eventTimestamp),
+        collectorContext.getUserId(), collectorContext.getFlowName());
+  }
+
+  private static String getCosmosDBEndpoint(Configuration conf) {
+    return conf.get(TIMELINE_SERVICE_COSMOSDB_ENDPOINT);
+  }
+
+  private static String getCosmosDBMasterKey(Configuration conf) {
+    return conf.get(TIMELINE_SERVICE_COSMOSDB_MASTER_KEY);
+  }
+
+  public static String getCosmosDBDatabaseName(Configuration conf) {
+    return conf.get(TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+        getDefaultTimelineServiceDBName(conf));
+  }
+
+  private static String getDefaultTimelineServiceDBName(
+      Configuration conf) {
+    return getClusterId(conf) + "_" +
+        DEFAULT_TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME;
+  }
+
+  private static String getClusterId(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_CLUSTER_ID,
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+  }
+
+  private static boolean isTimeInRange(long time, long timeBegin,
+      long timeEnd) {
+    return (time >= timeBegin) && (time <= timeEnd);
+  }
+
+  /**
+   * Checks if the {@link TimelineEntityFilters} are not matching for a given
+   * {@link TimelineEntity}.
+   * @param filters
+   *              that has to be checked for an entity
+   * @param timelineEntity
+ *                for which the filters would be applied
+   * @return true if any one of the filter is not matching else false
+   * @throws IOException if an unsupported filter is being matched.
+   */
+  static boolean isFilterNotMatching(TimelineEntityFilters filters,
+      TimelineEntity timelineEntity) throws IOException {
+    if (timelineEntity.getCreatedTime() != null && !isTimeInRange(timelineEntity
+        .getCreatedTime(), filters.getCreatedTimeBegin(),
+        filters.getCreatedTimeEnd())) {
+      return true;
+    }
+
+    if (filters.getRelatesTo() != null &&
+        !filters.getRelatesTo().getFilterList().isEmpty() &&
+        !TimelineStorageUtils.matchRelatesTo(timelineEntity,
+            filters.getRelatesTo())) {
+      return true;
+    }
+
+    if (filters.getIsRelatedTo() != null &&
+        !filters.getIsRelatedTo().getFilterList().isEmpty() &&
+        !TimelineStorageUtils.matchIsRelatedTo(timelineEntity,
+            filters.getIsRelatedTo())) {
+      return true;
+    }
+
+    if (filters.getInfoFilters() != null &&
+        !filters.getInfoFilters().getFilterList().isEmpty() &&
+        !TimelineStorageUtils.matchInfoFilters(timelineEntity,
+            filters.getInfoFilters())) {
+      return true;
+    }
+
+    if (filters.getConfigFilters() != null &&
+        !filters.getConfigFilters().getFilterList().isEmpty() &&
+        !TimelineStorageUtils.matchConfigFilters(timelineEntity,
+            filters.getConfigFilters())) {
+      return true;
+    }
+
+    if (filters.getMetricFilters() != null &&
+        !filters.getMetricFilters().getFilterList().isEmpty() &&
+        !TimelineStorageUtils.matchMetricFilters(timelineEntity,
+            filters.getMetricFilters())) {
+      return true;
+    }
+
+    return filters.getEventFilters() != null &&
+        !filters.getEventFilters().getFilterList().isEmpty() &&
+        !TimelineStorageUtils.matchEventFilters(timelineEntity,
+            filters.getEventFilters());
+  }
+
+  /**
+   * Creates the final entity to be returned as the result.
+   * @param timelineEntityDocument
+   *                         which has all the information for the entity
+   * @param dataToRetrieve
+   *                     specifies filters and fields to retrieve
+   * @return {@link TimelineEntity} as the result
+   */
+  public static TimelineEntity createEntityToBeReturned(
+      TimelineEntityDocument timelineEntityDocument,
+      TimelineDataToRetrieve dataToRetrieve) {
+    TimelineEntity entityToBeReturned = createTimelineEntity(
+        timelineEntityDocument.getType(),
+        timelineEntityDocument.fetchTimelineEntity());
+
+    entityToBeReturned.setIdentifier(new TimelineEntity.Identifier(
+        timelineEntityDocument.getType(), timelineEntityDocument.getId()));
+    entityToBeReturned.setCreatedTime(
+        timelineEntityDocument.getCreatedTime());
+    entityToBeReturned.setInfo(timelineEntityDocument.getInfo());
+
+    if (dataToRetrieve.getFieldsToRetrieve() != null) {
+      fillFields(entityToBeReturned, timelineEntityDocument,
+          dataToRetrieve);
+    }
+    return entityToBeReturned;
+  }
+
+  /**
+   * Creates the final entity to be returned as the result.
+   * @param timelineEntityDocument
+   *                         which has all the information for the entity
+   * @param confsToRetrieve
+   *                     specifies config filters to be applied
+   * @param metricsToRetrieve
+   *                     specifies metric filters to be applied
+   *
+   * @return {@link TimelineEntity} as the result
+   */
+  public static TimelineEntity createEntityToBeReturned(
+      TimelineEntityDocument timelineEntityDocument,
+      TimelineFilterList confsToRetrieve,
+      TimelineFilterList metricsToRetrieve) {
+    TimelineEntity timelineEntity = timelineEntityDocument
+        .fetchTimelineEntity();
+    if (confsToRetrieve != null) {
+      timelineEntity.setConfigs(DocumentStoreUtils.applyConfigFilter(
+          confsToRetrieve, timelineEntity.getConfigs()));
+    }
+    if (metricsToRetrieve != null) {
+      timelineEntity.setMetrics(DocumentStoreUtils.transformMetrics(
+          metricsToRetrieve, timelineEntityDocument.getMetrics()));
+    }
+    return timelineEntity;
+  }
+
+  private static TimelineEntity createTimelineEntity(String type,
+      TimelineEntity timelineEntity) {
+    switch (TimelineEntityType.valueOf(type)) {
+    case YARN_APPLICATION:
+      return new ApplicationEntity();
+    case YARN_FLOW_RUN:
+      return new FlowRunEntity();
+    case YARN_FLOW_ACTIVITY:
+      FlowActivityEntity flowActivityEntity =
+          (FlowActivityEntity) timelineEntity;
+      FlowActivityEntity newFlowActivity = new FlowActivityEntity();
+      newFlowActivity.addFlowRuns(flowActivityEntity.getFlowRuns());
+      return newFlowActivity;
+    default:
+      return new TimelineEntity();
+    }
+  }
+
+  // fetch required fields for final entity to be returned
+  private static void fillFields(TimelineEntity finalEntity,
+      TimelineEntityDocument entityDoc,
+      TimelineDataToRetrieve dataToRetrieve) {
+    EnumSet<TimelineReader.Field> fieldsToRetrieve =
+        dataToRetrieve.getFieldsToRetrieve();
+    if (fieldsToRetrieve.contains(TimelineReader.Field.ALL)) {
+      fieldsToRetrieve = EnumSet.allOf(TimelineReader.Field.class);
+    }
+    for (TimelineReader.Field field : fieldsToRetrieve) {
+      switch(field) {
+      case CONFIGS:
+        finalEntity.setConfigs(applyConfigFilter(dataToRetrieve
+                .getConfsToRetrieve(), entityDoc.getConfigs()));
+        break;
+      case METRICS:
+        finalEntity.setMetrics(transformMetrics(dataToRetrieve
+                .getMetricsToRetrieve(), entityDoc.getMetrics()));
+        break;
+      case INFO:
+        finalEntity.setInfo(entityDoc.getInfo());
+        break;
+      case IS_RELATED_TO:
+        finalEntity.setIsRelatedToEntities(entityDoc.getIsRelatedToEntities());
+        break;
+      case RELATES_TO:
+        finalEntity.setIsRelatedToEntities(entityDoc.getIsRelatedToEntities());
+        break;
+      case EVENTS:
+        finalEntity.setEvents(transformEvents(entityDoc.getEvents().values()));
+        break;
+      default:
+      }
+    }
+  }
+
+  /* Transforms Collection<Set<TimelineEventSubDoc>> to
+     NavigableSet<TimelineEvent> */
+  private static NavigableSet<TimelineEvent> transformEvents(
+      Collection<Set<TimelineEventSubDoc>> eventSetColl) {
+    NavigableSet<TimelineEvent> timelineEvents = new TreeSet<>();
+    for (Set<TimelineEventSubDoc> eventSubDocs : eventSetColl) {
+      for (TimelineEventSubDoc eventSubDoc : eventSubDocs) {
+        timelineEvents.add(eventSubDoc.fetchTimelineEvent());
+      }
+    }
+    return timelineEvents;
+  }
+
+  public static Set<TimelineMetric> transformMetrics(
+      TimelineFilterList metricsToRetrieve,
+      Map<String, Set<TimelineMetricSubDoc>> metrics) {
+    if (metricsToRetrieve == null ||
+        hasDataToBeRetrieve(metricsToRetrieve, metrics.keySet())) {
+      Set<TimelineMetric> metricSet = new HashSet<>();
+      for(Set<TimelineMetricSubDoc> metricSubDocs : metrics.values()) {
+        for(TimelineMetricSubDoc metricSubDoc : metricSubDocs) {
+          metricSet.add(metricSubDoc.fetchTimelineMetric());
+        }
+      }
+      return metricSet;
+    }
+    return new HashSet<>();
+  }
+
+  public static Map<String, String> applyConfigFilter(
+      TimelineFilterList configsToRetrieve, Map<String, String> configs) {
+    if (configsToRetrieve == null ||
+        hasDataToBeRetrieve(configsToRetrieve, configs.keySet())) {
+      return configs;
+    }
+    return new HashMap<>();
+  }
+
+  private static boolean hasDataToBeRetrieve(
+      TimelineFilterList timelineFilters, Set<String> dataSet) {
+    Set<String> dataToBeRetrieved = new HashSet<>();
+    TimelinePrefixFilter timelinePrefixFilter;
+    for (TimelineFilter timelineFilter : timelineFilters.getFilterList()) {
+      timelinePrefixFilter = (TimelinePrefixFilter) timelineFilter;
+      dataToBeRetrieved.add(timelinePrefixFilter.getPrefix());
+    }
+    switch (timelineFilters.getOperator()) {
+    case OR:
+      if (dataToBeRetrieved.size() == 0 ||
+          !Collections.disjoint(dataSet, dataToBeRetrieved)) {
+        return true;
+      }
+    case AND:
+      if (dataToBeRetrieved.size() == 0 ||
+          dataSet.containsAll(dataToBeRetrieved)) {
+        return true;
+      }
+    default:
+      return false;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/CollectionType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/CollectionType.java
new file mode 100755
index 0000000..f40e2c2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/CollectionType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;
+
+/**
+ * Specifies the different collections that are currently used for storing
+ * documents.
+ */
+public enum CollectionType {
+  ENTITY("EntityCollection"),
+  APPLICATION("AppCollection"),
+  FLOW_RUN("FlowRunCollection"),
+  FLOW_ACTIVITY("FlowActivityCollection");
+
+  private final String collectionName;
+
+  CollectionType(String collectionName) {
+    this.collectionName = collectionName;
+  }
+
+  public boolean equals(String otherCollectionName) {
+    return this.collectionName.equals(otherCollectionName);
+  }
+
+  public String getCollectionName() {
+    return collectionName;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/NoDocumentFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/NoDocumentFoundException.java
new file mode 100755
index 0000000..f6c123d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/NoDocumentFoundException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document;
+
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+
+import java.io.IOException;
+
+/**
+ * Indicates that the document that was requested is not found from the
+ * Document Store. This is a generic exception that will be thrown for all
+ * the {@link DocumentStoreVendor} if there is no document while reading.
+ */
+public class NoDocumentFoundException extends IOException {
+
+  /**
+   * Constructs exception with the specified detail message.
+   * @param message detailed message.
+   */
+  public NoDocumentFoundException(String message) {
+    super(message);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/TimelineDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/TimelineDocument.java
new file mode 100755
index 0000000..c0d53f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/TimelineDocument.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document;
+
+
+/**
+ * This is an interface for all the Timeline Documents. Any new document that
+ * has to be persisted in the document store should implement this.
+ */
+public interface TimelineDocument<Document> {
+
+  String getId();
+
+  String getType();
+
+  long getCreatedTime();
+
+  void setCreatedTime(long time);
+
+  void merge(Document timelineDocument);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEntityDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEntityDocument.java
new file mode 100755
index 0000000..ea72ee3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEntityDocument.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is a generic class which contains all the meta information of some
+ * conceptual entity and its related events. The timeline entity can be an
+ * application, an attempt, a container or whatever the user-defined object.
+ */
+public class TimelineEntityDocument implements
+    TimelineDocument<TimelineEntityDocument> {
+
+  private final TimelineEntity timelineEntity;
+  private TimelineContext context;
+  private String flowVersion;
+  private String subApplicationUser;
+  private final Map<String, Set<TimelineMetricSubDoc>>
+      metrics = new HashMap<>();
+  private final Map<String, Set<TimelineEventSubDoc>>
+      events = new HashMap<>();
+
+  public TimelineEntityDocument() {
+    timelineEntity = new TimelineEntity();
+  }
+
+  public TimelineEntityDocument(TimelineEntity timelineEntity) {
+    this.timelineEntity = timelineEntity;
+    transformEvents(timelineEntity.getEvents());
+    timelineMetrics(timelineEntity.getMetrics());
+  }
+
+  // transforms TimelineMetric to TimelineMetricSubDoc
+  private void timelineMetrics(Set<TimelineMetric> timelineMetrics) {
+    for (TimelineMetric timelineMetric : timelineMetrics) {
+      if (this.metrics.containsKey(timelineMetric.getId())) {
+        this.metrics.get(timelineMetric.getId()).add(
+            new TimelineMetricSubDoc(timelineMetric));
+      } else {
+        Set<TimelineMetricSubDoc> metricSet = new HashSet<>();
+        metricSet.add(new TimelineMetricSubDoc(timelineMetric));
+        this.metrics.put(timelineMetric.getId(), metricSet);
+      }
+    }
+  }
+
+  // transforms TimelineEvent to TimelineEventSubDoc
+  private void transformEvents(Set<TimelineEvent> timelineEvents) {
+    for (TimelineEvent timelineEvent : timelineEvents) {
+      if (this.events.containsKey(timelineEvent.getId())) {
+        this.events.get(timelineEvent.getId())
+            .add(new TimelineEventSubDoc(timelineEvent));
+      } else {
+        Set<TimelineEventSubDoc> eventSet = new HashSet<>();
+        eventSet.add(new TimelineEventSubDoc(timelineEvent));
+        this.events.put(timelineEvent.getId(), eventSet);
+      }
+    }
+  }
+
+  /**
+   * Merge the TimelineEntityDocument that is passed with the current
+   * document for upsert.
+   *
+   * @param newTimelineDocument
+   *          that has to be merged
+   */
+  @Override
+  public void merge(TimelineEntityDocument newTimelineDocument) {
+    if(newTimelineDocument.getCreatedTime() > 0) {
+      timelineEntity.setCreatedTime(newTimelineDocument.getCreatedTime());
+    }
+    setMetrics(newTimelineDocument.getMetrics());
+    setEvents(newTimelineDocument.getEvents());
+    timelineEntity.getInfo().putAll(newTimelineDocument.getInfo());
+    timelineEntity.getConfigs().putAll(newTimelineDocument.getConfigs());
+    timelineEntity.getIsRelatedToEntities().putAll(newTimelineDocument
+        .getIsRelatedToEntities());
+    timelineEntity.getRelatesToEntities().putAll(newTimelineDocument
+        .getRelatesToEntities());
+  }
+
+  @Override
+  public String getId() {
+    return timelineEntity.getId();
+  }
+
+  public void setId(String key) {
+    timelineEntity.setId(key);
+  }
+
+  public String getType() {
+    return timelineEntity.getType();
+  }
+
+  public void setType(String type) {
+    timelineEntity.setType(type);
+  }
+
+  public Map<String, Object> getInfo() {
+    timelineEntity.getInfo().put(TimelineReaderUtils.FROMID_KEY, getId());
+    return timelineEntity.getInfo();
+  }
+
+  public void setInfo(Map<String, Object> info) {
+    timelineEntity.setInfo(info);
+  }
+
+  public Map<String, Set<TimelineMetricSubDoc>> getMetrics() {
+    return metrics;
+  }
+
+  public void setMetrics(Map<String, Set<TimelineMetricSubDoc>> metrics) {
+    for (String metricId : metrics.keySet()) {
+      for(TimelineMetricSubDoc metricSubDoc : metrics.get(metricId)) {
+        timelineEntity.addMetric(metricSubDoc.fetchTimelineMetric());
+      }
+      if (this.metrics.containsKey(metricId)) {
+        this.metrics.get(metricId).addAll(metrics.get(metricId));
+      } else {
+        this.metrics.put(metricId, new HashSet<>(metrics.get(metricId)));
+      }
+    }
+  }
+
+  public Map<String, Set<TimelineEventSubDoc>> getEvents() {
+    return events;
+  }
+
+  public void setEvents(Map<String, Set<TimelineEventSubDoc>> events) {
+    for (String eventId : events.keySet()) {
+      for(TimelineEventSubDoc eventSubDoc: events.get(eventId)) {
+        timelineEntity.addEvent(eventSubDoc.fetchTimelineEvent());
+      }
+      if (this.events.containsKey(eventId)) {
+        this.events.get(eventId).addAll(events.get(eventId));
+      } else {
+        this.events.put(eventId, new HashSet<>(events.get(eventId)));
+      }
+    }
+  }
+
+  public Map<String, String> getConfigs() {
+    return timelineEntity.getConfigs();
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    timelineEntity.setConfigs(configs);
+  }
+
+  public Map<String, Set<String>> getIsRelatedToEntities() {
+    return timelineEntity.getIsRelatedToEntities();
+  }
+
+  public void setIsRelatedToEntities(Map<String, Set<String>>
+      isRelatedToEntities) {
+    timelineEntity.setIsRelatedToEntities(isRelatedToEntities);
+  }
+
+  public Map<String, Set<String>> getRelatesToEntities() {
+    return timelineEntity.getRelatesToEntities();
+  }
+
+  public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
+    timelineEntity.setRelatesToEntities(relatesToEntities);
+  }
+
+  public String getFlowVersion() {
+    return flowVersion;
+  }
+
+
+  public void setFlowVersion(String flowVersion) {
+    this.flowVersion = flowVersion;
+  }
+
+  public void setIdentifier(TimelineEntity.Identifier identifier) {
+    timelineEntity.setIdentifier(identifier);
+  }
+
+  public void setIdPrefix(long idPrefix) {
+    timelineEntity.setIdPrefix(idPrefix);
+  }
+
+  public String getSubApplicationUser() {
+    return subApplicationUser;
+  }
+
+  public void setSubApplicationUser(String subApplicationUser) {
+    this.subApplicationUser = subApplicationUser;
+  }
+
+  public long getCreatedTime() {
+    if (timelineEntity.getCreatedTime() == null) {
+      return 0;
+    }
+    return timelineEntity.getCreatedTime();
+  }
+
+  public void setCreatedTime(long createdTime) {
+    timelineEntity.setCreatedTime(createdTime);
+  }
+
+  public TimelineContext getContext() {
+    return context;
+  }
+
+  public void setContext(TimelineContext context) {
+    this.context = context;
+  }
+
+  public TimelineEntity fetchTimelineEntity() {
+    return timelineEntity;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
new file mode 100755
index 0000000..cce64ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+
+import java.util.Map;
+
+/**
+ * This class represents a Sub Document for {@link TimelineEvent}
+ * when creating a new {@link TimelineEntityDocument}.
+ */
+public class TimelineEventSubDoc {
+
+  private final TimelineEvent timelineEvent;
+  private boolean valid;
+
+  public TimelineEventSubDoc() {
+    timelineEvent = new TimelineEvent();
+  }
+
+  public TimelineEventSubDoc(TimelineEvent timelineEvent) {
+    this.timelineEvent = timelineEvent;
+  }
+
+  public String getId() {
+    return timelineEvent.getId();
+  }
+
+  public void setId(String eventId) {
+    timelineEvent.setId(eventId);
+  }
+
+  public boolean isValid() {
+    return timelineEvent.isValid();
+  }
+
+  public void setValid(boolean valid) {
+    this.valid = valid;
+  }
+
+  public long getTimestamp() {
+    return timelineEvent.getTimestamp();
+  }
+
+  public void setTimestamp(long ts) {
+    timelineEvent.setTimestamp(ts);
+  }
+
+  public Map<String, Object> getInfo() {
+    return timelineEvent.getInfo();
+  }
+
+  public void setInfo(Map<String, Object> info) {
+    timelineEvent.setInfo(TimelineServiceHelper.mapCastToHashMap(info));
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * timelineEvent.getId().hashCode();
+  }
+
+  // Only check if id is equal
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof TimelineEventSubDoc)) {
+      return false;
+    }
+    TimelineEventSubDoc otherTimelineEvent = (TimelineEventSubDoc) obj;
+    return this.timelineEvent.getId().equals(otherTimelineEvent.getId());
+  }
+
+  public TimelineEvent fetchTimelineEvent() {
+    return timelineEvent;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
new file mode 100755
index 0000000..f7d078f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class represents a Sub Document for {@link TimelineMetric} that will be
+ * used when creating new {@link TimelineEntityDocument}.
+ */
+public class TimelineMetricSubDoc {
+
+  private final TimelineMetric timelineMetric;
+  private boolean valid;
+  private long singleDataTimestamp;
+  private Number singleDataValue = 0;
+
+  public TimelineMetricSubDoc() {
+    this.timelineMetric = new TimelineMetric();
+  }
+
+  public TimelineMetricSubDoc(TimelineMetric timelineMetric) {
+    this.timelineMetric = timelineMetric;
+    this.valid = timelineMetric.isValid();
+    if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE &&
+        timelineMetric.getValues().size() > 0) {
+      this.singleDataTimestamp = timelineMetric.getSingleDataTimestamp();
+      this.singleDataValue = timelineMetric.getSingleDataValue();
+    }
+  }
+
+  /**
+   * Get the real time aggregation operation of this metric.
+   *
+   * @return Real time aggregation operation
+   */
+  public TimelineMetricOperation getRealtimeAggregationOp() {
+    return timelineMetric.getRealtimeAggregationOp();
+  }
+
+  /**
+   * Set the real time aggregation operation of this metric.
+   *
+   * @param op A timeline metric operation that the metric should perform on
+   *           real time aggregations
+   */
+  public void setRealtimeAggregationOp(
+      final TimelineMetricOperation op) {
+    timelineMetric.setRealtimeAggregationOp(op);
+  }
+
+  public String getId() {
+    return timelineMetric.getId();
+  }
+
+  public void setId(String metricId) {
+    timelineMetric.setId(metricId);
+  }
+
+  public void setSingleDataTimestamp(long singleDataTimestamp) {
+    this.singleDataTimestamp = singleDataTimestamp;
+  }
+
+  /**
+   * Get single data timestamp of the metric.
+   *
+   * @return the single data timestamp
+   */
+  public long getSingleDataTimestamp() {
+    if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE) {
+      return singleDataTimestamp;
+    }
+    return 0;
+  }
+
+  /**
+   * Get single data value of the metric.
+   *
+   * @return the single data value
+   */
+  public Number getSingleDataValue() {
+    if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE) {
+      return singleDataValue;
+    }
+    return null;
+  }
+
+  public void setSingleDataValue(Number singleDataValue) {
+    this.singleDataValue = singleDataValue;
+  }
+
+  public Map<Long, Number> getValues() {
+    return timelineMetric.getValues();
+  }
+
+  public void setValues(Map<Long, Number> vals) {
+    timelineMetric.setValues(vals);
+  }
+
+  // required by JAXB
+  public TreeMap<Long, Number> getValuesJAXB() {
+    return timelineMetric.getValuesJAXB();
+  }
+
+  public TimelineMetric.Type getType() {
+    return timelineMetric.getType();
+  }
+
+  public void setType(TimelineMetric.Type metricType) {
+    timelineMetric.setType(metricType);
+  }
+
+  public void setValid(boolean valid) {
+    this.valid = valid;
+  }
+
+  public boolean isValid() {
+    return (timelineMetric.getId() != null);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = timelineMetric.getId().hashCode();
+    result = 31 * result + timelineMetric.getType().hashCode();
+    return result;
+  }
+
+  // Only check if timestamp and id are equal
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof TimelineMetricSubDoc)) {
+      return false;
+    }
+    TimelineMetricSubDoc otherTimelineMetric = (TimelineMetricSubDoc) obj;
+    if (!this.timelineMetric.getId().equals(otherTimelineMetric.getId())) {
+      return false;
+    }
+    return this.timelineMetric.getType() == otherTimelineMetric.getType();
+  }
+
+  public TimelineMetric fetchTimelineMetric() {
+    return timelineMetric;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/package-info.java
new file mode 100644
index 0000000..dbafa9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document.entity contains
+ * TimelineEntityDocument that will be common to different TimelineEntity i.e
+ * Application, App Attempt, Container etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivityDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivityDocument.java
new file mode 100755
index 0000000..264bfec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivityDocument.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity;
+
+
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This doc represents the {@link FlowActivityEntity} which is used for
+ * showing all the flow runs with limited information.
+ */
+public class FlowActivityDocument implements
+    TimelineDocument<FlowActivityDocument> {
+
+  private String id;
+  private final String type = TimelineEntityType.YARN_FLOW_ACTIVITY.toString();
+  private Set<FlowActivitySubDoc> flowActivities = new HashSet<>();
+  private long dayTimestamp;
+  private String user;
+  private String flowName;
+
+  public FlowActivityDocument() {
+  }
+
+  public FlowActivityDocument(String flowName, String flowVersion,
+      long flowRunId) {
+    flowActivities.add(new FlowActivitySubDoc(flowName,
+        flowVersion, flowRunId));
+  }
+
+  /**
+   * Merge the {@link FlowActivityDocument} that is passed with the current
+   * document for upsert.
+   *
+   * @param flowActivityDocument
+   *          that has to be merged
+   */
+  @Override
+  public void merge(FlowActivityDocument flowActivityDocument) {
+    if (flowActivityDocument.getDayTimestamp() > 0) {
+      this.dayTimestamp = flowActivityDocument.getDayTimestamp();
+    }
+    this.flowName = flowActivityDocument.getFlowName();
+    this.user = flowActivityDocument.getUser();
+    this.id = flowActivityDocument.getId();
+    this.flowActivities.addAll(flowActivityDocument.getFlowActivities());
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  public void addFlowActivity(String flowActivityName, String flowVersion,
+      long flowRunId) {
+    flowActivities.add(new FlowActivitySubDoc(flowActivityName,
+        flowVersion, flowRunId));
+  }
+
+  public Set<FlowActivitySubDoc> getFlowActivities() {
+    return flowActivities;
+  }
+
+  public void setFlowActivities(Set<FlowActivitySubDoc> flowActivities) {
+    this.flowActivities = flowActivities;
+  }
+
+  @Override
+  public long getCreatedTime() {
+    return TimeUnit.SECONDS.toMillis(dayTimestamp);
+  }
+
+  @Override
+  public void setCreatedTime(long time) {
+  }
+
+  public long getDayTimestamp() {
+    return dayTimestamp;
+  }
+
+  public void setDayTimestamp(long dayTimestamp) {
+    this.dayTimestamp = dayTimestamp;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public void setFlowName(String flowName) {
+    this.flowName = flowName;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivitySubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivitySubDoc.java
new file mode 100755
index 0000000..93b28e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivitySubDoc.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity;
+
+/**
+ * This is a sub doc which represents each flow.
+ */
+public class FlowActivitySubDoc {
+  private String flowName;
+  private String flowVersion;
+  private long flowRunId;
+
+  public FlowActivitySubDoc() {
+  }
+
+  public FlowActivitySubDoc(String flowName, String flowVersion,
+      long flowRunId) {
+    this.flowName = flowName;
+    this.flowVersion = flowVersion;
+    this.flowRunId = flowRunId;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public String getFlowVersion() {
+    return flowVersion;
+  }
+
+  public long getFlowRunId() {
+    return flowRunId;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = flowVersion.hashCode();
+    result = (int) (31 * result + flowRunId);
+    return result;
+  }
+
+  // Only check if type and id are equal
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof FlowActivitySubDoc)) {
+      return false;
+    }
+    FlowActivitySubDoc m = (FlowActivitySubDoc) o;
+    if (!flowVersion.equalsIgnoreCase(m.getFlowVersion())) {
+      return false;
+    }
+    return flowRunId == m.getFlowRunId();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/package-info.java
new file mode 100644
index 0000000..c1127dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document.flowactivity contains
+ * FlowActivityDocument to audit all the flows at a day level.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/FlowRunDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/FlowRunDocument.java
new file mode 100755
index 0000000..8ee87ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/FlowRunDocument.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This doc represents the flow run information for every job.
+ */
+public class FlowRunDocument implements TimelineDocument<FlowRunDocument> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FlowRunDocument.class);
+
+  private String id;
+  private final String type = TimelineEntityType.YARN_FLOW_RUN.toString();
+  private String clusterId;
+  private String username;
+  private String flowName;
+  private Long flowRunId;
+  private String flowVersion;
+  private long minStartTime;
+  private long maxEndTime;
+  private final Map<String, TimelineMetricSubDoc>
+      metrics = new HashMap<>();
+
+  public FlowRunDocument() {
+  }
+
+  public FlowRunDocument(TimelineCollectorContext collectorContext,
+      Set<TimelineMetric> metrics) {
+    this.clusterId = collectorContext.getClusterId();
+    this.username = collectorContext.getUserId();
+    this.flowName = collectorContext.getFlowName();
+    this.flowRunId = collectorContext.getFlowRunId();
+    transformMetrics(metrics);
+  }
+
+  private void transformMetrics(Set<TimelineMetric> timelineMetrics) {
+    for (TimelineMetric metric : timelineMetrics) {
+      TimelineMetricSubDoc metricSubDoc = new TimelineMetricSubDoc(metric);
+      this.metrics.put(metric.getId(), metricSubDoc);
+    }
+  }
+
+  /**
+   * Merge the {@link FlowRunDocument} that is passed with the current
+   * document for upsert.
+   *
+   * @param flowRunDoc
+   *          that has to be merged
+   */
+  @Override
+  public void merge(FlowRunDocument flowRunDoc) {
+    if (flowRunDoc.getMinStartTime() > 0) {
+      this.minStartTime = flowRunDoc.getMinStartTime();
+    }
+    if (flowRunDoc.getMaxEndTime() > 0) {
+      this.maxEndTime = flowRunDoc.getMaxEndTime();
+    }
+    this.clusterId = flowRunDoc.getClusterId();
+    this.flowName = flowRunDoc.getFlowName();
+    this.id = flowRunDoc.getId();
+    this.username = flowRunDoc.getUsername();
+    this.flowVersion = flowRunDoc.getFlowVersion();
+    this.flowRunId = flowRunDoc.getFlowRunId();
+    aggregateMetrics(flowRunDoc.getMetrics());
+  }
+
+  private void aggregateMetrics(
+      Map<String, TimelineMetricSubDoc> metricSubDocMap) {
+    for(String metricId : metricSubDocMap.keySet()) {
+      if (this.metrics.containsKey(metricId)) {
+        TimelineMetric incomingMetric =
+            metricSubDocMap.get(metricId).fetchTimelineMetric();
+        TimelineMetric baseMetric =
+            this.metrics.get(metricId).fetchTimelineMetric();
+        if (incomingMetric.getValues().size() > 0) {
+          baseMetric = aggregate(incomingMetric, baseMetric);
+          this.metrics.put(metricId, new TimelineMetricSubDoc(baseMetric));
+        } else {
+          LOG.debug("No incoming metric to aggregate for : {}",
+              baseMetric.getId());
+        }
+      } else {
+        this.metrics.put(metricId, metricSubDocMap.get(metricId));
+      }
+    }
+  }
+
+  private TimelineMetric aggregate(TimelineMetric incomingMetric,
+      TimelineMetric baseMetric) {
+    switch (baseMetric.getRealtimeAggregationOp()) {
+    case SUM:
+      baseMetric = TimelineMetricOperation.SUM
+          .aggregate(incomingMetric, baseMetric, null);
+      break;
+    case AVG:
+      baseMetric = TimelineMetricOperation.AVG
+          .aggregate(incomingMetric, baseMetric, null);
+      break;
+    case MAX:
+      baseMetric = TimelineMetricOperation.MAX
+          .aggregate(incomingMetric, baseMetric, null);
+      break;
+    case REPLACE:
+      baseMetric = TimelineMetricOperation.REPLACE
+          .aggregate(incomingMetric, baseMetric, null);
+    default:
+      //NoOP
+    }
+    return baseMetric;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public void setClusterId(String clusterId) {
+    this.clusterId = clusterId;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public void setFlowName(String flowName) {
+    this.flowName = flowName;
+  }
+
+  public Long getFlowRunId() {
+    return flowRunId;
+  }
+
+  public void setFlowRunId(Long flowRunId) {
+    this.flowRunId = flowRunId;
+  }
+
+  public Map<String, TimelineMetricSubDoc> getMetrics() {
+    return metrics;
+  }
+
+  public void setMetrics(Map<String, TimelineMetricSubDoc> metrics) {
+    this.metrics.putAll(metrics);
+  }
+
+  public Set<TimelineMetric> fetchTimelineMetrics() {
+    Set<TimelineMetric> metricSet = new HashSet<>();
+    for(TimelineMetricSubDoc metricSubDoc : metrics.values()) {
+      metricSet.add(metricSubDoc.fetchTimelineMetric());
+    }
+    return metricSet;
+  }
+
+  public long getMinStartTime() {
+    return minStartTime;
+  }
+
+  public void setMinStartTime(long minStartTime) {
+    this.minStartTime = minStartTime;
+  }
+
+  public long getMaxEndTime() {
+    return maxEndTime;
+  }
+
+  public void setMaxEndTime(long maxEndTime) {
+    this.maxEndTime = maxEndTime;
+  }
+
+  @Override
+  public String getType() {
+    return type;
+  }
+
+  @Override
+  public long getCreatedTime() {
+    return minStartTime;
+  }
+
+  @Override
+  public void setCreatedTime(long createdTime) {
+    if(minStartTime == 0) {
+      minStartTime = createdTime;
+    }
+  }
+
+  public String getFlowVersion() {
+    return flowVersion;
+  }
+
+  public void setFlowVersion(String flowVersion) {
+    this.flowVersion = flowVersion;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/package-info.java
new file mode 100644
index 0000000..43c2ae0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document.flowrun contains
+ * FlowRunDocument that stores the flow level information for
+ * each Application and also aggregates the metrics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/package-info.java
new file mode 100644
index 0000000..ff3cf82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document contains interface for all the
+ * Timeline Documents. Any new document that has to be persisted in
+ * the document store should implement this.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/package-info.java
new file mode 100644
index 0000000..663e8de
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection contains different collection types
+ * for storing documents.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreFactory.java
new file mode 100755
index 0000000..04c9bb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb.CosmosDBDocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb.CosmosDBDocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+
+import static org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils.getStoreVendor;
+
+/**
+ * Factory methods for instantiating a timeline Document Store reader or
+ * writer. Based on the {@link DocumentStoreVendor} that is configured,
+ * appropriate reader or writer would be instantiated.
+ */
+public final class DocumentStoreFactory {
+
+  // making factory class not instantiable
+  private DocumentStoreFactory(){
+  }
+
+  /**
+   * Creates a DocumentStoreWriter for a {@link DocumentStoreVendor}.
+   * @param conf
+   *              for creating client connection
+   * @param <Document> type of Document for which the writer has to be created,
+   *                  i.e TimelineEntityDocument, FlowActivityDocument etc
+   * @return document store writer
+   * @throws DocumentStoreNotSupportedException if there is no implementation
+   *         for a configured {@link DocumentStoreVendor} or unknown
+   *         {@link DocumentStoreVendor} is configured.
+   * @throws YarnException if the required configs for DocumentStore is missing.
+   */
+  public static <Document extends TimelineDocument>
+      DocumentStoreWriter <Document> createDocumentStoreWriter(
+          Configuration conf) throws YarnException {
+    final DocumentStoreVendor storeType = getStoreVendor(conf);
+    switch (storeType) {
+    case COSMOS_DB:
+      DocumentStoreUtils.validateCosmosDBConf(conf);
+      return new CosmosDBDocumentStoreWriter<>(conf);
+    default:
+      throw new DocumentStoreNotSupportedException(
+          "Unable to create DocumentStoreWriter for type : "
+              + storeType);
+    }
+  }
+
+  /**
+ * Creates a DocumentStoreReader for a {@link DocumentStoreVendor}.
+ * @param conf
+ *            for creating client connection
+ * @param <Document> type of Document for which the writer has to be created,
+ *                  i.e TimelineEntityDocument, FlowActivityDocument etc
+ * @return document store reader
+ * @throws DocumentStoreNotSupportedException if there is no implementation
+ *         for a configured {@link DocumentStoreVendor} or unknown
+ *         {@link DocumentStoreVendor} is configured.
+ * @throws YarnException if the required configs for DocumentStore is missing.
+ * */
+  public static <Document extends TimelineDocument>
+      DocumentStoreReader<Document> createDocumentStoreReader(
+          Configuration conf) throws YarnException {
+    final DocumentStoreVendor storeType = getStoreVendor(conf);
+    switch (storeType) {
+    case COSMOS_DB:
+      DocumentStoreUtils.validateCosmosDBConf(conf);
+      return new CosmosDBDocumentStoreReader<>(conf);
+    default:
+      throw new DocumentStoreNotSupportedException(
+          "Unable to create DocumentStoreReader for type : "
+              + storeType);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreNotSupportedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreNotSupportedException.java
new file mode 100755
index 0000000..7fc9c6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreNotSupportedException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+/**
+ * Indicates that the document store vendor that was
+ * configured does not belong to one of the {@link DocumentStoreVendor}.
+ */
+public class DocumentStoreNotSupportedException extends
+    UnsupportedOperationException {
+
+  /**
+   * Constructs exception with the specified detail message.
+   * @param message detailed message.
+   */
+  public DocumentStoreNotSupportedException(String message) {
+    super(message);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreVendor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreVendor.java
new file mode 100755
index 0000000..e7612f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreVendor.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+/**
+ * Represents the different vendors for DocumentStore.
+ */
+public enum DocumentStoreVendor {
+
+  COSMOS_DB,
+  MONGO_DB,
+  ELASTIC_SEARCH;
+
+  public static DocumentStoreVendor getStoreType(String storeTypeStr) {
+    for (DocumentStoreVendor storeType : DocumentStoreVendor.values()) {
+      if (storeType.name().equalsIgnoreCase(storeTypeStr)) {
+        return DocumentStoreVendor.valueOf(storeTypeStr.toUpperCase());
+      }
+    }
+    throw new DocumentStoreNotSupportedException(
+        storeTypeStr + " is not a valid DocumentStoreVendor");
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/package-info.java
new file mode 100644
index 0000000..83a8479
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.lib contains factory class for instantiating
+ * different DocumentStore reader writer client based on the DocumentVendor
+ * configured.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/package-info.java
new file mode 100644
index 0000000..2803777
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore
+ * contains DocumentStore Reader and Writer Implementation of TimelineService
+ * for reading and writing documents from DocumentStore.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DocumentStoreReader.java
new file mode 100755
index 0000000..23f96cc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DocumentStoreReader.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.NoDocumentFoundException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Every {@link DocumentStoreVendor} have to implement this for creating
+ * reader to its backend.
+ */
+public interface DocumentStoreReader<Document extends TimelineDocument>
+    extends AutoCloseable {
+
+  Document readDocument(String collectionName, TimelineReaderContext context,
+      Class<Document> documentClass) throws NoDocumentFoundException;
+
+  List<Document> readDocumentList(String collectionName,
+      TimelineReaderContext context, Class<Document> documentClass,
+      long documentsSize) throws NoDocumentFoundException;
+
+  Set<String> fetchEntityTypes(String collectionName,
+      TimelineReaderContext context);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/TimelineCollectionReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/TimelineCollectionReader.java
new file mode 100755
index 0000000..a8c2c8e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/TimelineCollectionReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivitySubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is a generic Collection reader for reading documents belonging to a
+ * {@link CollectionType} under a specific {@link DocumentStoreVendor} backend.
+ */
+public  class TimelineCollectionReader {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TimelineCollectionReader.class);
+
+  private final DocumentStoreReader<TimelineEntityDocument>
+      genericEntityDocReader;
+  private final DocumentStoreReader<FlowRunDocument>
+      flowRunDocReader;
+  private final DocumentStoreReader<FlowActivityDocument>
+      flowActivityDocReader;
+
+  public TimelineCollectionReader(
+      Configuration conf) throws YarnException {
+    LOG.info("Initializing TimelineCollectionReader...");
+    genericEntityDocReader = DocumentStoreFactory
+        .createDocumentStoreReader(conf);
+    flowRunDocReader = DocumentStoreFactory
+        .createDocumentStoreReader(conf);
+    flowActivityDocReader = DocumentStoreFactory
+        .createDocumentStoreReader(conf);
+  }
+
+  /**
+   * Read a document from {@link DocumentStoreVendor} backend for
+   * a {@link CollectionType}.
+   * @param context
+   *               of the timeline reader
+   * @return TimelineEntityDocument as response
+   * @throws IOException on error while reading
+   */
+  public TimelineEntityDocument readDocument(
+      TimelineReaderContext context) throws IOException {
+    LOG.debug("Fetching document for entity type {}", context.getEntityType());
+    switch (TimelineEntityType.valueOf(context.getEntityType())) {
+    case YARN_APPLICATION:
+      return genericEntityDocReader.readDocument(
+          CollectionType.APPLICATION.getCollectionName(), context,
+           TimelineEntityDocument.class);
+    case YARN_FLOW_RUN:
+      FlowRunDocument flowRunDoc = flowRunDocReader.readDocument(
+          CollectionType.FLOW_RUN.getCollectionName(), context,
+          FlowRunDocument.class);
+      FlowRunEntity flowRun = createFlowRunEntity(flowRunDoc);
+      return new TimelineEntityDocument(flowRun);
+    case YARN_FLOW_ACTIVITY:
+      FlowActivityDocument flowActivityDoc = flowActivityDocReader
+          .readDocument(CollectionType.FLOW_RUN.getCollectionName(),
+              context, FlowActivityDocument.class);
+      FlowActivityEntity flowActivity = createFlowActivityEntity(context,
+          flowActivityDoc);
+      return  new TimelineEntityDocument(flowActivity);
+    default:
+      return genericEntityDocReader.readDocument(
+          CollectionType.ENTITY.getCollectionName(), context,
+          TimelineEntityDocument.class);
+    }
+  }
+
+  /**
+   * Read a list of  documents from {@link DocumentStoreVendor} backend for
+   * a {@link CollectionType}.
+   * @param context
+   *               of the timeline reader
+   * @param documentsSize
+   *               to limit
+   * @return List of TimelineEntityDocument as response
+   * @throws IOException on error while reading
+   */
+  public List<TimelineEntityDocument> readDocuments(
+      TimelineReaderContext context, long documentsSize) throws IOException {
+    List<TimelineEntityDocument> entityDocs = new ArrayList<>();
+    LOG.debug("Fetching documents for entity type {}", context.getEntityType());
+    switch (TimelineEntityType.valueOf(context.getEntityType())) {
+    case YARN_APPLICATION:
+      return genericEntityDocReader.readDocumentList(
+          CollectionType.APPLICATION.getCollectionName(), context,
+           TimelineEntityDocument.class, documentsSize);
+    case YARN_FLOW_RUN:
+      List<FlowRunDocument> flowRunDocs = flowRunDocReader.readDocumentList(
+          CollectionType.FLOW_RUN.getCollectionName(), context,
+               FlowRunDocument.class, documentsSize);
+      for (FlowRunDocument flowRunDoc : flowRunDocs) {
+        entityDocs.add(new TimelineEntityDocument(createFlowRunEntity(
+            flowRunDoc)));
+      }
+      return entityDocs;
+    case YARN_FLOW_ACTIVITY:
+      List<FlowActivityDocument> flowActivityDocs = flowActivityDocReader
+          .readDocumentList(CollectionType.FLOW_ACTIVITY.getCollectionName(),
+              context, FlowActivityDocument.class, documentsSize);
+      for(FlowActivityDocument flowActivityDoc : flowActivityDocs) {
+        entityDocs.add(new TimelineEntityDocument(
+            createFlowActivityEntity(context, flowActivityDoc)));
+      }
+      return entityDocs;
+    default:
+      return genericEntityDocReader.readDocumentList(
+          CollectionType.ENTITY.getCollectionName(), context,
+          TimelineEntityDocument.class, documentsSize);
+    }
+  }
+
+  /**
+   * Fetches the list of Entity Types i.e (YARN_CONTAINER,
+   * YARN_APPLICATION_ATTEMPT etc.) for an application Id.
+   * @param context
+   *               of the timeline reader
+   * @return List of EntityTypes as response
+   */
+  public Set<String> fetchEntityTypes(
+      TimelineReaderContext context) {
+    LOG.debug("Fetching all entity-types for appId : {}", context.getAppId());
+    return genericEntityDocReader.fetchEntityTypes(
+        CollectionType.ENTITY.getCollectionName(), context);
+  }
+
+  private FlowActivityEntity createFlowActivityEntity(
+      TimelineReaderContext context, FlowActivityDocument flowActivityDoc) {
+    FlowActivityEntity flowActivity = new FlowActivityEntity(
+        context.getClusterId(), flowActivityDoc.getDayTimestamp(),
+        flowActivityDoc.getUser(), flowActivityDoc.getFlowName());
+    flowActivity.setId(flowActivityDoc.getId());
+    // get the list of run ids along with the version that are associated with
+    // this flow on this day
+    for (FlowActivitySubDoc activity : flowActivityDoc
+        .getFlowActivities()) {
+      FlowRunEntity flowRunEntity = new FlowRunEntity();
+      flowRunEntity.setUser(flowActivityDoc.getUser());
+      flowRunEntity.setName(activity.getFlowName());
+      flowRunEntity.setRunId(activity.getFlowRunId());
+      flowRunEntity.setVersion(activity.getFlowVersion());
+      flowRunEntity.setId(flowRunEntity.getId());
+      flowActivity.addFlowRun(flowRunEntity);
+    }
+    flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
+        flowActivityDoc.getId());
+    flowActivity.setCreatedTime(flowActivityDoc.getDayTimestamp());
+    return flowActivity;
+  }
+
+  private FlowRunEntity createFlowRunEntity(FlowRunDocument flowRunDoc) {
+    FlowRunEntity flowRun = new FlowRunEntity();
+    flowRun.setRunId(flowRunDoc.getFlowRunId());
+    flowRun.setUser(flowRunDoc.getUsername());
+    flowRun.setName(flowRunDoc.getFlowName());
+
+    // read the start time
+    if (flowRunDoc.getMinStartTime() > 0) {
+      flowRun.setStartTime(flowRunDoc.getMinStartTime());
+    }
+
+    // read the end time if available
+    if (flowRunDoc.getMaxEndTime() > 0) {
+      flowRun.setMaxEndTime(flowRunDoc.getMaxEndTime());
+    }
+
+    // read the flow version
+    if (!DocumentStoreUtils.isNullOrEmpty(flowRunDoc.getFlowVersion())) {
+      flowRun.setVersion(flowRunDoc.getFlowVersion());
+    }
+    flowRun.setMetrics(flowRunDoc.fetchTimelineMetrics());
+    flowRun.setId(flowRunDoc.getId());
+    flowRun.getInfo().put(TimelineReaderUtils.FROMID_KEY, flowRunDoc.getId());
+    return flowRun;
+  }
+
+  public void close() throws Exception {
+    genericEntityDocReader.close();
+    flowRunDocReader.close();
+    flowActivityDocReader.close();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
new file mode 100755
index 0000000..64468ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
+
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.FeedOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.NoDocumentFoundException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * This is the Document Store Reader implementation for
+ * {@link DocumentStoreVendor#COSMOS_DB}.
+ */
+public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
+    implements DocumentStoreReader<TimelineDoc> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CosmosDBDocumentStoreReader.class);
+  private static final int DEFAULT_DOCUMENTS_SIZE = 1;
+
+  private static DocumentClient client;
+  private final String databaseName;
+  private final static String COLLECTION_LINK = "/dbs/%s/colls/%s";
+  private final static String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * " +
+      "FROM %s c";
+  private final static String SELECT_ALL_FROM_COLLECTION =
+      "SELECT  * FROM %s c";
+  private final static String SELECT_DISTINCT_TYPES_FROM_COLLECTION =
+      "SELECT  distinct c.type FROM %s c";
+  private static final String ENTITY_TYPE_COLUMN = "type";
+  private final static String WHERE_CLAUSE = " WHERE ";
+  private final static String AND_OPERATOR = " AND ";
+  private final static String CONTAINS_FUNC_FOR_ID = " CONTAINS(c.id, \"%s\") ";
+  private final static String CONTAINS_FUNC_FOR_TYPE = " CONTAINS(c.type, " +
+      "\"%s\") ";
+  private final static String ORDER_BY_CLAUSE = " ORDER BY c.createdTime";
+
+  public CosmosDBDocumentStoreReader(Configuration conf) {
+    LOG.info("Initializing Cosmos DB DocumentStoreReader...");
+    databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
+    // making CosmosDB Client Singleton
+    if (client == null) {
+      synchronized (this) {
+        if (client == null) {
+          LOG.info("Creating Cosmos DB Client...");
+          client = DocumentStoreUtils.createCosmosDBClient(conf);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<TimelineDoc> readDocumentList(String collectionName,
+      TimelineReaderContext context, final Class<TimelineDoc> timelineDocClass,
+      long size) throws NoDocumentFoundException {
+    final List<TimelineDoc> result = queryDocuments(collectionName,
+        context, timelineDocClass, size);
+    if (result.size() > 0) {
+      return result;
+    }
+    throw new NoDocumentFoundException("No documents were found while " +
+        "querying Collection : " + collectionName);
+  }
+
+  @Override
+  public Set<String> fetchEntityTypes(String collectionName,
+      TimelineReaderContext context) {
+    StringBuilder queryStrBuilder = new StringBuilder();
+    queryStrBuilder.append(
+        String.format(SELECT_DISTINCT_TYPES_FROM_COLLECTION, collectionName));
+    String sqlQuery = addPredicates(context, collectionName, queryStrBuilder);
+
+    LOG.debug("Querying Collection : {} , with query {}", collectionName,
+        sqlQuery);
+
+    Set<String> entityTypes = new HashSet<>();
+    Iterator<Document> documentIterator = client.queryDocuments(
+        String.format(COLLECTION_LINK, databaseName, collectionName),
+        sqlQuery, null).getQueryIterator();
+    while (documentIterator.hasNext()) {
+      Document document = documentIterator.next();
+      entityTypes.add(document.getString(ENTITY_TYPE_COLUMN));
+    }
+    return entityTypes;
+  }
+
+  @Override
+  public TimelineDoc readDocument(String collectionName, TimelineReaderContext
+      context, final Class<TimelineDoc> timelineDocClass)
+      throws  NoDocumentFoundException {
+    final List<TimelineDoc> result = queryDocuments(collectionName,
+        context, timelineDocClass, DEFAULT_DOCUMENTS_SIZE);
+    if(result.size() > 0) {
+      return result.get(0);
+    }
+    throw new NoDocumentFoundException("No documents were found while " +
+        "querying Collection : " + collectionName);
+  }
+
+  private List<TimelineDoc> queryDocuments(String collectionName,
+      TimelineReaderContext context, final Class<TimelineDoc> docClass,
+      final long maxDocumentsSize) {
+    final String sqlQuery = buildQueryWithPredicates(context, collectionName,
+        maxDocumentsSize);
+    List<TimelineDoc> timelineDocs = new ArrayList<>();
+    LOG.debug("Querying Collection : {} , with query {}", collectionName,
+        sqlQuery);
+
+    FeedOptions feedOptions = new FeedOptions();
+    feedOptions.setPageSize((int) maxDocumentsSize);
+    Iterator<Document> documentIterator = client.queryDocuments(
+        String.format(COLLECTION_LINK, databaseName, collectionName),
+        sqlQuery, feedOptions).getQueryIterator();
+    while (documentIterator.hasNext()) {
+      Document document = documentIterator.next();
+      TimelineDoc resultDoc = document.toObject(docClass);
+      if (resultDoc.getCreatedTime() == 0 &&
+          document.getTimestamp() != null) {
+        resultDoc.setCreatedTime(document.getTimestamp().getTime());
+      }
+      timelineDocs.add(resultDoc);
+    }
+    return timelineDocs;
+  }
+
+  private String buildQueryWithPredicates(TimelineReaderContext context,
+      String collectionName, long size) {
+    StringBuilder queryStrBuilder = new StringBuilder();
+    if (size == -1) {
+      queryStrBuilder.append(String.format(SELECT_ALL_FROM_COLLECTION,
+          collectionName));
+    } else {
+      queryStrBuilder.append(String.format(SELECT_TOP_FROM_COLLECTION, size,
+          collectionName));
+    }
+
+    return addPredicates(context, collectionName, queryStrBuilder);
+  }
+
+  private String addPredicates(TimelineReaderContext context,
+      String collectionName, StringBuilder queryStrBuilder) {
+    boolean hasPredicate = false;
+
+    queryStrBuilder.append(WHERE_CLAUSE);
+
+    if (context.getClusterId() != null) {
+      hasPredicate = true;
+      queryStrBuilder.append(String.format(CONTAINS_FUNC_FOR_ID,
+          context.getClusterId()));
+    }
+    if (context.getUserId() != null) {
+      hasPredicate = true;
+      queryStrBuilder.append(AND_OPERATOR)
+          .append(String.format(CONTAINS_FUNC_FOR_ID, context.getUserId()));
+    }
+    if (context.getFlowName() != null) {
+      hasPredicate = true;
+      queryStrBuilder.append(AND_OPERATOR)
+          .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowName()));
+    }
+    if (context.getAppId() != null) {
+      hasPredicate = true;
+      queryStrBuilder.append(AND_OPERATOR)
+          .append(String.format(CONTAINS_FUNC_FOR_ID, context.getAppId()));
+    }
+    if (context.getEntityId() != null) {
+      hasPredicate = true;
+      queryStrBuilder.append(AND_OPERATOR)
+          .append(String.format(CONTAINS_FUNC_FOR_ID, context.getEntityId()));
+    }
+    if (context.getFlowRunId() != null) {
+      hasPredicate = true;
+      queryStrBuilder.append(AND_OPERATOR)
+          .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowRunId()));
+    }
+    if (context.getEntityType() != null){
+      hasPredicate = true;
+      queryStrBuilder.append(AND_OPERATOR)
+          .append(String.format(CONTAINS_FUNC_FOR_TYPE,
+              context.getEntityType()));
+    }
+
+    if (hasPredicate) {
+      queryStrBuilder.append(ORDER_BY_CLAUSE);
+      LOG.debug("CosmosDB Sql Query with predicates : {}", queryStrBuilder);
+      return queryStrBuilder.toString();
+    }
+    throw new IllegalArgumentException("The TimelineReaderContext does not " +
+        "have enough information to query documents for Collection : " +
+        collectionName);
+  }
+
+  @Override
+  public synchronized void close() {
+    if (client != null) {
+      LOG.info("Closing Cosmos DB Client...");
+      client.close();
+      client = null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/package-info.java
new file mode 100644
index 0000000..3a20892
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore
+ * .reader.cosmosdb DocumentStore Reader implementation for CosmosDB.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/package-info.java
new file mode 100644
index 0000000..e36bf51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader
+ * contains the implementation of different DocumentStore reader clients
+ * for DocumentVendor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DocumentStoreWriter.java
new file mode 100755
index 0000000..31b2710
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DocumentStoreWriter.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+
+/**
+ * Every {@link DocumentStoreVendor} have to implement this for creating
+ * writer to its backend.
+ */
+public interface DocumentStoreWriter<Document> extends AutoCloseable {
+
+  void createDatabase();
+
+  void createCollection(String collectionName);
+
+  void writeDocument(Document document, CollectionType collectionType);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.java
new file mode 100755
index 0000000..5c3c56d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a generic Collection Writer that can be used for writing documents
+ * belonging to different {@link CollectionType} under a specific
+ * {@link DocumentStoreVendor} backend.
+ */
+public class TimelineCollectionWriter<Document extends TimelineDocument> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TimelineCollectionWriter.class);
+
+  private final static String DOCUMENT_BUFFER_SIZE_CONF =
+      "yarn.timeline-service.document-buffer.size";
+  private static final int DEFAULT_BUFFER_SIZE = 1024;
+  private static final int AWAIT_TIMEOUT_SECS = 5;
+  private static final PerNodeAggTimelineCollectorMetrics METRICS =
+      PerNodeAggTimelineCollectorMetrics.getInstance();
+
+  private final CollectionType collectionType;
+  private final DocumentStoreWriter<Document> documentStoreWriter;
+  private final Map<String, Document> documentsBuffer;
+  private final int maxBufferSize;
+  private final ScheduledExecutorService scheduledDocumentsFlusher;
+  private final ExecutorService documentsBufferFullFlusher;
+
+  public TimelineCollectionWriter(CollectionType collectionType,
+      Configuration conf) throws YarnException {
+    LOG.info("Initializing TimelineCollectionWriter for collection type : {}",
+        collectionType);
+    int flushIntervalSecs = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
+        YarnConfiguration
+            .DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
+    maxBufferSize = conf.getInt(DOCUMENT_BUFFER_SIZE_CONF, DEFAULT_BUFFER_SIZE);
+    documentsBuffer = new HashMap<>(maxBufferSize);
+    this.collectionType = collectionType;
+    documentStoreWriter = DocumentStoreFactory.createDocumentStoreWriter(conf);
+    scheduledDocumentsFlusher = Executors.newSingleThreadScheduledExecutor();
+    scheduledDocumentsFlusher.scheduleAtFixedRate(this::flush,
+        flushIntervalSecs, flushIntervalSecs, TimeUnit.SECONDS);
+    documentsBufferFullFlusher = Executors.newSingleThreadExecutor();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void writeDocument(Document timelineDocument) {
+    /*
+     * The DocumentBuffer is used to buffer the most frequently used
+     * documents for performing upserts on them, whenever either due to
+     * buffer gets fulled or the scheduledDocumentsFlusher
+     * invokes flush() periodically, all the buffered documents would be written
+     * to DocumentStore in a background thread.
+     */
+    long startTime = Time.monotonicNow();
+
+    synchronized(documentsBuffer) {
+      //if buffer is full copy to flushBuffer in order to flush
+      if (documentsBuffer.size() == maxBufferSize) {
+        final Map<String, Document> flushedBuffer = copyToFlushBuffer();
+        //flush all documents from flushBuffer in background
+        documentsBufferFullFlusher.execute(() -> flush(flushedBuffer));
+      }
+      Document prevDocument = documentsBuffer.get(timelineDocument.getId());
+      // check if Document exists inside documentsBuffer
+      if (prevDocument != null) {
+        prevDocument.merge(timelineDocument);
+      } else { // else treat this as a new document
+        prevDocument = timelineDocument;
+      }
+      documentsBuffer.put(prevDocument.getId(), prevDocument);
+    }
+    METRICS.addAsyncPutEntitiesLatency(Time.monotonicNow() - startTime,
+        true);
+  }
+
+  private Map<String, Document> copyToFlushBuffer() {
+    Map<String, Document> flushBuffer = new HashMap<>();
+    synchronized(documentsBuffer) {
+      if (documentsBuffer.size() > 0) {
+        flushBuffer.putAll(documentsBuffer);
+        documentsBuffer.clear();
+      }
+    }
+    return flushBuffer;
+  }
+
+  private void flush(Map<String, Document> flushBuffer) {
+    for (Document document : flushBuffer.values()) {
+      documentStoreWriter.writeDocument(document, collectionType);
+    }
+  }
+
+  public void flush() {
+    flush(copyToFlushBuffer());
+  }
+
+  public void close() throws Exception {
+    scheduledDocumentsFlusher.shutdown();
+    documentsBufferFullFlusher.shutdown();
+
+    flush();
+
+    scheduledDocumentsFlusher.awaitTermination(
+        AWAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+    documentsBufferFullFlusher.awaitTermination(
+        AWAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+    documentStoreWriter.close();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
new file mode 100755
index 0000000..b345276
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+
+import com.microsoft.azure.documentdb.AccessCondition;
+import com.microsoft.azure.documentdb.AccessConditionType;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.RequestOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the Document Store Writer implementation for
+ * {@link DocumentStoreVendor#COSMOS_DB}.
+ */
+public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
+    implements DocumentStoreWriter<TimelineDoc> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CosmosDBDocumentStoreWriter.class);
+
+  private static DocumentClient client;
+  private final String databaseName;
+  private static final PerNodeAggTimelineCollectorMetrics METRICS =
+      PerNodeAggTimelineCollectorMetrics.getInstance();
+  private static final String DATABASE_LINK = "/dbs/%s";
+  private static final String COLLECTION_LINK = DATABASE_LINK + "/colls/%s";
+  private static final String DOCUMENT_LINK = COLLECTION_LINK + "/docs/%s";
+
+  public CosmosDBDocumentStoreWriter(Configuration conf) {
+    LOG.info("Initializing Cosmos DB DocumentStoreWriter...");
+    databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
+    // making CosmosDB Client Singleton
+    if (client == null) {
+      synchronized (this) {
+        if (client == null) {
+          LOG.info("Creating Cosmos DB Client...");
+          client = DocumentStoreUtils.createCosmosDBClient(conf);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void createDatabase() {
+    try {
+      client.readDatabase(String.format(
+          DATABASE_LINK, databaseName), new RequestOptions());
+      LOG.info("Database {} already exists.", databaseName);
+    } catch (DocumentClientException docExceptionOnRead) {
+      if (docExceptionOnRead.getStatusCode() ==  404) {
+        LOG.info("Creating new Database : {}", databaseName);
+        Database databaseDefinition = new Database();
+        databaseDefinition.setId(databaseName);
+        try {
+          client.createDatabase(databaseDefinition, new RequestOptions());
+        } catch (DocumentClientException docExceptionOnCreate) {
+          LOG.error("Unable to create new Database : {}", databaseName,
+              docExceptionOnCreate);
+        }
+      } else {
+        LOG.error("Error while reading Database : {}", databaseName,
+            docExceptionOnRead);
+      }
+    }
+  }
+
+  @Override
+  public void createCollection(final String collectionName) {
+    LOG.info("Creating Timeline Collection : {} for Database : {}",
+        collectionName, databaseName);
+    try {
+      client.readCollection(String.format(COLLECTION_LINK, databaseName,
+          collectionName), new RequestOptions());
+      LOG.info("Collection {} already exists.", collectionName);
+    } catch (DocumentClientException docExceptionOnRead) {
+      if (docExceptionOnRead.getStatusCode() == 404) {
+        DocumentCollection collection = new DocumentCollection();
+        collection.setId(collectionName);
+        LOG.info("Creating collection {} under Database {}",
+            collectionName, databaseName);
+        try {
+          client.createCollection(
+              String.format(DATABASE_LINK, databaseName),
+              collection, new RequestOptions());
+        } catch (DocumentClientException docExceptionOnCreate) {
+          LOG.error("Unable to create Collection : {} under Database : {}",
+              collectionName, databaseName, docExceptionOnCreate);
+        }
+      } else {
+        LOG.error("Error while reading Collection : {} under Database : {}",
+            collectionName, databaseName, docExceptionOnRead);
+      }
+    }
+  }
+
+  @Override
+  public void writeDocument(final TimelineDoc timelineDoc,
+      final CollectionType collectionType) {
+    LOG.debug("Upserting document under collection : {} with  entity type : " +
+        "{} under Database {}", databaseName, timelineDoc.getType(),
+        collectionType.getCollectionName());
+    boolean succeeded = false;
+    long startTime = Time.monotonicNow();
+    try {
+      upsertDocument(collectionType, timelineDoc);
+      succeeded = true;
+    } catch (Exception e) {
+      LOG.error("Unable to perform upsert for Document Id : {} under " +
+          "Collection : {} under Database {}", timelineDoc.getId(),
+          collectionType.getCollectionName(), databaseName, e);
+    } finally {
+      long latency = Time.monotonicNow() - startTime;
+      METRICS.addPutEntitiesLatency(latency, succeeded);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void upsertDocument(final  CollectionType collectionType,
+      final TimelineDoc timelineDoc) {
+    final String collectionLink = String.format(COLLECTION_LINK, databaseName,
+        collectionType.getCollectionName());
+    RequestOptions requestOptions  = new RequestOptions();
+    AccessCondition accessCondition = new AccessCondition();
+    StringBuilder eTagStrBuilder = new StringBuilder();
+
+    TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType,
+        timelineDoc, eTagStrBuilder);
+
+    accessCondition.setCondition(eTagStrBuilder.toString());
+    accessCondition.setType(AccessConditionType.IfMatch);
+    requestOptions.setAccessCondition(accessCondition);
+
+    try {
+      client.upsertDocument(collectionLink, updatedTimelineDoc,
+          requestOptions, true);
+      LOG.debug("Successfully wrote doc with id : {} and type : {} under " +
+          "Database : {}", timelineDoc.getId(), timelineDoc.getType(),
+          databaseName);
+    } catch (DocumentClientException e) {
+      if (e.getStatusCode() == 409) {
+        LOG.warn("There was a conflict while upserting, hence retrying...", e);
+        upsertDocument(collectionType, updatedTimelineDoc);
+      }
+      LOG.error("Error while upserting Collection : {} with Doc Id : {} under" +
+          " Database : {}", collectionType.getCollectionName(),
+          updatedTimelineDoc.getId(), databaseName, e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType,
+      TimelineDoc timelineDoc, StringBuilder eTagStrBuilder) {
+    TimelineDoc prevDocument = fetchLatestDoc(collectionType,
+        timelineDoc.getId(), eTagStrBuilder);
+    if (prevDocument != null) {
+      prevDocument.merge(timelineDoc);
+      timelineDoc = prevDocument;
+    }
+    return timelineDoc;
+  }
+
+  @SuppressWarnings("unchecked")
+  private TimelineDoc fetchLatestDoc(final CollectionType collectionType,
+      final String documentId, StringBuilder eTagStrBuilder) {
+    final String documentLink = String.format(DOCUMENT_LINK, databaseName,
+        collectionType.getCollectionName(), documentId);
+    try {
+      Document latestDocument = client.readDocument(documentLink, new
+          RequestOptions()).getResource();
+      TimelineDoc timelineDoc;
+      switch (collectionType) {
+      case FLOW_RUN:
+        timelineDoc = (TimelineDoc) latestDocument.toObject(
+            FlowRunDocument.class);
+        break;
+      case FLOW_ACTIVITY:
+        timelineDoc = (TimelineDoc) latestDocument.toObject(FlowActivityDocument
+            .class);
+        break;
+      default:
+        timelineDoc = (TimelineDoc) latestDocument.toObject(
+            TimelineEntityDocument.class);
+      }
+      eTagStrBuilder.append(latestDocument.getETag());
+      return timelineDoc;
+    } catch (Exception e) {
+      LOG.debug("No previous Document found with id : {} for Collection" +
+          " : {} under Database : {}", documentId, collectionType
+          .getCollectionName(), databaseName);
+      return null;
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    if (client != null) {
+      LOG.info("Closing Cosmos DB Client...");
+      client.close();
+      client = null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/package-info.java
new file mode 100644
index 0000000..3f2165f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore
+ * .writer.cosmosdb DocumentStore writer implementation for CosmosDB.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/package-info.java
new file mode 100644
index 0000000..544d321
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer
+ * contains the implementation of different DocumentStore writer clients
+ * for DocumentVendor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTestUtils.java
new file mode 100755
index 0000000..cf57085
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTestUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This is util class for baking sample TimelineEntities data for test.
+ */
+public final class DocumentStoreTestUtils {
+
+  private DocumentStoreTestUtils(){}
+
+
+  public static List<TimelineEntity> bakeTimelineEntities()
+      throws IOException {
+    String jsonStr = IOUtils.toString(
+        DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+            "documents/timeline-entities.json"), "UTF-8");
+    return JsonUtils.fromJson(jsonStr,
+        new TypeReference<List<TimelineEntity>>(){});
+  }
+
+  public static List<TimelineEntityDocument> bakeYarnAppTimelineEntities()
+      throws IOException {
+    String jsonStr = IOUtils.toString(
+        DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+            "documents/test-timeline-entities-doc.json"), "UTF-8");
+    return JsonUtils.fromJson(jsonStr,
+        new TypeReference<List<TimelineEntityDocument>>() {});
+  }
+
+  public static TimelineEntityDocument bakeTimelineEntityDoc()
+      throws IOException {
+    String jsonStr = IOUtils.toString(
+        DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+            "documents/timeline-app-doc.json"), "UTF-8");
+    return JsonUtils.fromJson(jsonStr,
+        new TypeReference<TimelineEntityDocument>() {});
+  }
+
+  public static FlowActivityDocument bakeFlowActivityDoc() throws IOException {
+    String jsonStr = IOUtils.toString(
+        DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+            "documents/flowactivity-doc.json"), "UTF-8");
+    return JsonUtils.fromJson(jsonStr,
+        new TypeReference<FlowActivityDocument>() {});
+  }
+
+  public static FlowRunDocument bakeFlowRunDoc() throws IOException {
+    String jsonStr = IOUtils.toString(
+        DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+            "documents/flowrun-doc.json"), "UTF-8");
+    return JsonUtils.fromJson(jsonStr,
+        new TypeReference<FlowRunDocument>(){});
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/JsonUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/JsonUtils.java
new file mode 100755
index 0000000..c1da4f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/JsonUtils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+
+/**
+ * A simple util class for Json SerDe.
+ */
+public final class JsonUtils {
+
+  private JsonUtils(){}
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  static {
+    OBJECT_MAPPER.configure(
+        DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  /**
+   * Deserialize the Json String to JAVA Object.
+   * @param jsonStr
+   *               json string that has to be deserialized
+   * @param type
+   *              of JAVA Object
+   * @return JAVA Object after deserialization
+   * @throws IOException if Json String is not valid or error
+   *                     while deserialization
+   */
+  public static <T> T fromJson(final String jsonStr, final TypeReference type)
+      throws IOException {
+    return OBJECT_MAPPER.readValue(jsonStr, type);
+  }
+
+  public static ObjectMapper getObjectMapper() {
+    return OBJECT_MAPPER;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreCollectionCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreCollectionCreator.java
new file mode 100644
index 0000000..879b0ad
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreCollectionCreator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DummyDocumentStoreWriter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test case for ${@link DocumentStoreCollectionCreator}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreFactory.class)
+public class TestDocumentStoreCollectionCreator {
+
+  private final DocumentStoreWriter<TimelineDocument> documentStoreWriter = new
+      DummyDocumentStoreWriter<>();
+  private final Configuration conf = new Configuration();
+
+  @Before
+  public void setUp() throws YarnException {
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+        "TestDB");
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
+        "https://localhost:443");
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
+        "1234567");
+    PowerMockito.mockStatic(DocumentStoreFactory.class);
+    PowerMockito.when(DocumentStoreFactory.createDocumentStoreWriter(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn(documentStoreWriter);
+  }
+
+  @Test
+  public void collectionCreatorTest() {
+    new DocumentStoreCollectionCreator().createTimelineSchema(new String[]{});
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineReaderImpl.java
new file mode 100755
index 0000000..181e134
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineReaderImpl.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DummyDocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Test case for {@link DocumentStoreTimelineReaderImpl}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreFactory.class)
+public class TestDocumentStoreTimelineReaderImpl {
+
+  private final DocumentStoreReader<TimelineDocument> documentStoreReader = new
+      DummyDocumentStoreReader<>();
+  private final List<TimelineEntity> entities = DocumentStoreTestUtils
+      .bakeTimelineEntities();
+  private final TimelineEntityDocument appTimelineEntity =
+      DocumentStoreTestUtils.bakeTimelineEntityDoc();
+
+  private final Configuration conf = new Configuration();
+  private final TimelineReaderContext context = new
+      TimelineReaderContext(null, null, null,
+      1L, null, null, null);
+  private final DocumentStoreTimelineReaderImpl timelineReader = new
+      DocumentStoreTimelineReaderImpl();
+
+  public TestDocumentStoreTimelineReaderImpl() throws IOException {
+  }
+
+  @Before
+  public void setUp() throws YarnException {
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+        "TestDB");
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
+        "https://localhost:443");
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
+        "1234567");
+    PowerMockito.mockStatic(DocumentStoreFactory.class);
+    PowerMockito.when(DocumentStoreFactory.createDocumentStoreReader(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn(documentStoreReader);
+  }
+
+  @Test(expected = YarnException.class)
+  public void testFailOnNoCosmosDBConfigs() throws Exception {
+    DocumentStoreUtils.validateCosmosDBConf(new Configuration());
+  }
+
+  @Test
+  public void testGetEntity() throws Exception {
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    EnumSet<TimelineReader.Field> fieldsToRetrieve = EnumSet.noneOf(
+        TimelineReader.Field.class);
+    dataToRetrieve.setFieldsToRetrieve(fieldsToRetrieve);
+
+    TimelineEntity timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+
+    Assert.assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
+        .getCreatedTime().longValue());
+    Assert.assertEquals(0, timelineEntity .getMetrics().size());
+    Assert.assertEquals(0, timelineEntity.getEvents().size());
+    Assert.assertEquals(0, timelineEntity.getConfigs().size());
+    Assert.assertEquals(appTimelineEntity.getInfo().size(),
+        timelineEntity.getInfo().size());
+  }
+
+  @Test
+  public void testGetEntityCustomField() throws Exception {
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.METRICS);
+
+    TimelineEntity timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+
+    Assert.assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
+        .getCreatedTime().longValue());
+    Assert.assertEquals(appTimelineEntity.getMetrics().size(),
+        timelineEntity.getMetrics().size());
+    Assert.assertEquals(0, timelineEntity.getEvents().size());
+    Assert.assertEquals(0, timelineEntity.getConfigs().size());
+    Assert.assertEquals(appTimelineEntity.getInfo().size(),
+        timelineEntity.getInfo().size());
+  }
+
+  @Test
+  public void testGetEntityAllFields() throws Exception {
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);
+
+    TimelineEntity timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+
+    Assert.assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
+        .getCreatedTime().longValue());
+    Assert.assertEquals(appTimelineEntity.getMetrics().size(),
+        timelineEntity .getMetrics().size());
+    Assert.assertEquals(appTimelineEntity.getEvents().size(),
+        timelineEntity.getEvents().size());
+    Assert.assertEquals(appTimelineEntity.getConfigs().size(),
+        timelineEntity.getConfigs().size());
+    Assert.assertEquals(appTimelineEntity.getInfo().size(),
+        timelineEntity.getInfo().size());
+  }
+
+  @Test
+  public void testGetAllEntities() throws Exception {
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);
+
+    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().build(), dataToRetrieve);
+
+    Assert.assertEquals(entities.size(), actualEntities.size());
+  }
+
+  @Test
+  public void testGetEntitiesWithLimit() throws Exception {
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+
+    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().entityLimit(2L).build(),
+        dataToRetrieve);
+
+    Assert.assertEquals(2, actualEntities.size());
+  }
+
+  @Test
+  public void testGetEntitiesByWindows() throws Exception {
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+
+    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().createdTimeBegin(1533985554927L)
+            .createTimeEnd(1533985554927L).build(), dataToRetrieve);
+
+    Assert.assertEquals(1, actualEntities.size());
+  }
+
+  @Test
+  public void testGetFilteredEntities() throws Exception {
+
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    timelineReader.serviceInit(conf);
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);
+
+    // Get entities based on info filters.
+    TimelineFilterList infoFilterList = new TimelineFilterList();
+    infoFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL,
+            "YARN_APPLICATION_ATTEMPT_FINAL_STATUS", "SUCCEEDED"));
+    Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
+        dataToRetrieve);
+
+    Assert.assertEquals(1, actualEntities.size());
+    // Only one entity with type YARN_APPLICATION_ATTEMPT should be returned.
+    for (TimelineEntity entity : actualEntities) {
+      if (!entity.getType().equals("YARN_APPLICATION_ATTEMPT")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    // Get entities based on config filters.
+    TimelineFilterList confFilterList = new TimelineFilterList();
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    confFilterList.addFilter(
+        new TimelineKeyValueFilter(TimelineCompareOp.EQUAL,
+            "YARN_AM_NODE_LABEL_EXPRESSION", "<DEFAULT_PARTITION>"));
+    actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().configFilters(confFilterList)
+            .build(), dataToRetrieve);
+
+    Assert.assertEquals(1, actualEntities.size());
+    // Only one entity with type YARN_APPLICATION should be returned.
+    for (TimelineEntity entity : actualEntities) {
+      if (!entity.getType().equals("YARN_APPLICATION")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    // Get entities based on event filters.
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    TimelineFilterList eventFilters = new TimelineFilterList();
+    eventFilters.addFilter(
+        new TimelineExistsFilter(TimelineCompareOp.EQUAL,
+            "CONTAINER_LAUNCHED"));
+    actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().eventFilters(eventFilters).build(),
+        dataToRetrieve);
+
+    Assert.assertEquals(1, actualEntities.size());
+    // Only one entity with type YARN_CONTAINER should be returned.
+    for (TimelineEntity entity : actualEntities) {
+      if (!entity.getType().equals("YARN_CONTAINER")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    // Get entities based on metric filters.
+    TimelineFilterList metricFilterList = new TimelineFilterList();
+    metricFilterList.addFilter(new TimelineCompareFilter(
+        TimelineCompareOp.GREATER_OR_EQUAL, "MEMORY", 150298624L));
+    actualEntities = timelineReader.getEntities(context,
+        new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
+            .build(), dataToRetrieve);
+
+    Assert.assertEquals(1, actualEntities.size());
+    // Only one entity with type YARN_CONTAINER should be returned.
+    for (TimelineEntity entity : actualEntities) {
+      if (!entity.getType().equals("YARN_CONTAINER")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+  }
+
+  @Test
+  public void testReadingDifferentEntityTypes() throws Exception {
+
+    timelineReader.serviceInit(conf);
+
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+
+    // reading YARN_FLOW_ACTIVITY
+    context.setEntityType(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+    TimelineEntity timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+
+    Assert.assertEquals(TimelineEntityType.YARN_FLOW_ACTIVITY.toString(),
+        timelineEntity.getType());
+
+    // reading YARN_FLOW_RUN
+    context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString());
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+
+    Assert.assertEquals(TimelineEntityType.YARN_FLOW_RUN.toString(),
+        timelineEntity.getType());
+
+    // reading YARN_APPLICATION
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+
+    Assert.assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+        timelineEntity.getType());
+  }
+
+  @Test
+  public void testReadingAllEntityTypes() throws Exception {
+
+    timelineReader.serviceInit(conf);
+
+    context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+    Set<String> entityTypes = timelineReader.getEntityTypes(context);
+    Assert.assertTrue(entityTypes.contains(TimelineEntityType.YARN_CONTAINER
+        .toString()));
+    Assert.assertTrue(entityTypes.contains(TimelineEntityType
+        .YARN_APPLICATION_ATTEMPT.toString()));
+  }
+
+  @Test
+  public void testMetricsToRetrieve() throws Exception {
+
+    timelineReader.serviceInit(conf);
+
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.METRICS);
+    TimelineFilterList timelineFilterList = new TimelineFilterList();
+
+    //testing metrics prefix for OR condition
+    timelineFilterList.setOperator(TimelineFilterList.Operator.OR);
+    timelineFilterList.addFilter(new TimelinePrefixFilter(
+        TimelineCompareOp.EQUAL, "NOTHING"));
+    dataToRetrieve.setMetricsToRetrieve(timelineFilterList);
+
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    TimelineEntity timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertEquals(0, timelineEntity.getMetrics().size());
+
+    timelineFilterList.addFilter(new TimelinePrefixFilter(
+        TimelineCompareOp.EQUAL,
+            "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED"));
+    dataToRetrieve.setMetricsToRetrieve(timelineFilterList);
+
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertTrue(timelineEntity.getMetrics().size() > 0);
+
+    //testing metrics prefix for AND condition
+    timelineFilterList.setOperator(TimelineFilterList.Operator.AND);
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertEquals(0, timelineEntity.getMetrics().size());
+
+    dataToRetrieve.getMetricsToRetrieve().getFilterList().remove(0);
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertTrue(timelineEntity.getMetrics().size() > 0);
+  }
+
+  @Test
+  public void testConfigsToRetrieve() throws Exception {
+
+    timelineReader.serviceInit(conf);
+
+    TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+    dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.CONFIGS);
+    TimelineFilterList timelineFilterList = new TimelineFilterList();
+
+    //testing metrics prefix for OR condition
+    timelineFilterList.setOperator(TimelineFilterList.Operator.OR);
+    timelineFilterList.addFilter(new TimelinePrefixFilter(
+        TimelineCompareOp.EQUAL, "NOTHING"));
+    dataToRetrieve.setConfsToRetrieve(timelineFilterList);
+
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    TimelineEntity timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertEquals(0, timelineEntity.getConfigs().size());
+
+    timelineFilterList.addFilter(new TimelinePrefixFilter(
+        TimelineCompareOp.EQUAL, "YARN_AM_NODE_LABEL_EXPRESSION"));
+    dataToRetrieve.setConfsToRetrieve(timelineFilterList);
+
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertTrue(timelineEntity.getConfigs().size() > 0);
+
+    //testing metrics prefix for AND condition
+    timelineFilterList.setOperator(TimelineFilterList.Operator.AND);
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertEquals(0, timelineEntity.getConfigs().size());
+
+    dataToRetrieve.getConfsToRetrieve().getFilterList().remove(0);
+    context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+    timelineEntity = timelineReader.getEntity(context,
+        dataToRetrieve);
+    Assert.assertTrue(timelineEntity.getConfigs().size() > 0);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineWriterImpl.java
new file mode 100755
index 0000000..b654de8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineWriterImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DummyDocumentStoreWriter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test case for {@link DocumentStoreTimelineWriterImpl}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreFactory.class)
+public class TestDocumentStoreTimelineWriterImpl {
+
+  private final DocumentStoreWriter<TimelineDocument> documentStoreWriter = new
+      DummyDocumentStoreWriter<>();
+  private final Configuration conf = new Configuration();
+
+  @Before
+  public void setUp() throws YarnException {
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+        "TestDB");
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
+        "https://localhost:443");
+    conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
+        "1234567");
+    PowerMockito.mockStatic(DocumentStoreFactory.class);
+    PowerMockito.when(DocumentStoreFactory.createDocumentStoreWriter(
+        ArgumentMatchers.any(Configuration.class)))
+        .thenReturn(documentStoreWriter);
+  }
+
+  @Test(expected = YarnException.class)
+  public void testFailOnNoCosmosDBConfigs() throws Exception {
+    DocumentStoreUtils.validateCosmosDBConf(new Configuration());
+  }
+
+  @Test
+  public void testWritingToCosmosDB() throws Exception {
+    DocumentStoreTimelineWriterImpl timelineWriter = new
+        DocumentStoreTimelineWriterImpl();
+
+    timelineWriter.serviceInit(conf);
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntities(DocumentStoreTestUtils.bakeTimelineEntities());
+    entities.addEntity(DocumentStoreTestUtils.bakeTimelineEntityDoc()
+        .fetchTimelineEntity());
+
+    PowerMockito.verifyStatic(DocumentStoreFactory.class);
+
+    TimelineCollectorContext context = new TimelineCollectorContext();
+    context.setFlowName("TestFlow");
+    context.setAppId("DUMMY_APP_ID");
+    context.setClusterId("yarn_cluster");
+    context.setUserId("test_user");
+    timelineWriter.write(context, entities,
+        UserGroupInformation.createRemoteUser("test_user"));
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/TestDocumentOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/TestDocumentOperations.java
new file mode 100755
index 0000000..0779431
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/TestDocumentOperations.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Timeline Entity Document merge and aggregation test.
+ */
+public class TestDocumentOperations {
+
+  private static final String MEMORY_ID = "MEMORY";
+  private static final String FLOW_NAME = "DistributedShell";
+  private static final String FLOW_VERSION = "1";
+
+  @Test
+  public void testTimelineEntityDocMergeOperation() throws IOException {
+    TimelineEntityDocument actualEntityDoc =
+        new TimelineEntityDocument();
+    TimelineEntityDocument expectedEntityDoc =
+        DocumentStoreTestUtils.bakeTimelineEntityDoc();
+
+    Assert.assertEquals(1, actualEntityDoc.getInfo().size());
+    Assert.assertEquals(0, actualEntityDoc.getMetrics().size());
+    Assert.assertEquals(0, actualEntityDoc.getEvents().size());
+    Assert.assertEquals(0, actualEntityDoc.getConfigs().size());
+    Assert.assertEquals(0, actualEntityDoc.getIsRelatedToEntities().size());
+    Assert.assertEquals(0, actualEntityDoc.getRelatesToEntities().size());
+
+    actualEntityDoc.merge(expectedEntityDoc);
+
+    Assert.assertEquals(expectedEntityDoc.getInfo().size(),
+        actualEntityDoc.getInfo().size());
+    Assert.assertEquals(expectedEntityDoc.getMetrics().size(),
+        actualEntityDoc.getMetrics().size());
+    Assert.assertEquals(expectedEntityDoc.getEvents().size(),
+        actualEntityDoc.getEvents().size());
+    Assert.assertEquals(expectedEntityDoc.getConfigs().size(),
+        actualEntityDoc.getConfigs().size());
+    Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+        actualEntityDoc.getIsRelatedToEntities().size());
+    Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+        actualEntityDoc.getRelatesToEntities().size());
+  }
+
+  @Test
+  public void testFlowActivityDocMergeOperation() throws IOException {
+    FlowActivityDocument actualFlowActivityDoc = new FlowActivityDocument();
+    FlowActivityDocument expectedFlowActivityDoc =
+        DocumentStoreTestUtils.bakeFlowActivityDoc();
+
+    Assert.assertEquals(0, actualFlowActivityDoc.getDayTimestamp());
+    Assert.assertEquals(0, actualFlowActivityDoc.getFlowActivities().size());
+    Assert.assertNull(actualFlowActivityDoc.getFlowName());
+    Assert.assertEquals(TimelineEntityType.YARN_FLOW_ACTIVITY.toString(),
+        actualFlowActivityDoc.getType());
+    Assert.assertNull(actualFlowActivityDoc.getUser());
+    Assert.assertNull(actualFlowActivityDoc.getId());
+
+    actualFlowActivityDoc.merge(expectedFlowActivityDoc);
+
+    Assert.assertEquals(expectedFlowActivityDoc.getDayTimestamp(),
+        actualFlowActivityDoc.getDayTimestamp());
+    Assert.assertEquals(expectedFlowActivityDoc.getFlowActivities().size(),
+        actualFlowActivityDoc.getFlowActivities().size());
+    Assert.assertEquals(expectedFlowActivityDoc.getFlowName(),
+        actualFlowActivityDoc.getFlowName());
+    Assert.assertEquals(expectedFlowActivityDoc.getType(),
+        actualFlowActivityDoc.getType());
+    Assert.assertEquals(expectedFlowActivityDoc.getUser(),
+        actualFlowActivityDoc.getUser());
+    Assert.assertEquals(expectedFlowActivityDoc.getId(),
+        actualFlowActivityDoc.getId());
+
+    expectedFlowActivityDoc.addFlowActivity(FLOW_NAME,
+        FLOW_VERSION, System.currentTimeMillis());
+
+    actualFlowActivityDoc.merge(expectedFlowActivityDoc);
+
+    Assert.assertEquals(expectedFlowActivityDoc.getDayTimestamp(),
+        actualFlowActivityDoc.getDayTimestamp());
+    Assert.assertEquals(expectedFlowActivityDoc.getFlowActivities().size(),
+        actualFlowActivityDoc.getFlowActivities().size());
+    Assert.assertEquals(expectedFlowActivityDoc.getFlowName(),
+        actualFlowActivityDoc.getFlowName());
+    Assert.assertEquals(expectedFlowActivityDoc.getType(),
+        actualFlowActivityDoc.getType());
+    Assert.assertEquals(expectedFlowActivityDoc.getUser(),
+        actualFlowActivityDoc.getUser());
+    Assert.assertEquals(expectedFlowActivityDoc.getId(),
+        actualFlowActivityDoc.getId());
+  }
+
+  @Test
+  public void testFlowRunDocMergeAndAggOperation() throws IOException {
+    FlowRunDocument actualFlowRunDoc = new FlowRunDocument();
+    FlowRunDocument expectedFlowRunDoc = DocumentStoreTestUtils
+        .bakeFlowRunDoc();
+
+    final long timestamp = System.currentTimeMillis();
+    final long value = 98586624;
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setId(MEMORY_ID);
+    timelineMetric.setType(TimelineMetric.Type.SINGLE_VALUE);
+    timelineMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+    timelineMetric.addValue(timestamp, value);
+    TimelineMetricSubDoc metricSubDoc = new TimelineMetricSubDoc(
+        timelineMetric);
+    expectedFlowRunDoc.getMetrics().put(MEMORY_ID, metricSubDoc);
+
+    Assert.assertNull(actualFlowRunDoc.getClusterId());
+    Assert.assertNull(actualFlowRunDoc.getFlowName());
+    Assert.assertNull(actualFlowRunDoc.getFlowRunId());
+    Assert.assertNull(actualFlowRunDoc.getFlowVersion());
+    Assert.assertNull(actualFlowRunDoc.getId());
+    Assert.assertNull(actualFlowRunDoc.getUsername());
+    Assert.assertEquals(actualFlowRunDoc.getType(), TimelineEntityType.
+        YARN_FLOW_RUN.toString());
+    Assert.assertEquals(0, actualFlowRunDoc.getMinStartTime());
+    Assert.assertEquals(0, actualFlowRunDoc.getMaxEndTime());
+    Assert.assertEquals(0, actualFlowRunDoc.getMetrics().size());
+
+    actualFlowRunDoc.merge(expectedFlowRunDoc);
+
+    Assert.assertEquals(expectedFlowRunDoc.getClusterId(),
+        actualFlowRunDoc.getClusterId());
+    Assert.assertEquals(expectedFlowRunDoc.getFlowName(),
+        actualFlowRunDoc.getFlowName());
+    Assert.assertEquals(expectedFlowRunDoc.getFlowRunId(),
+        actualFlowRunDoc.getFlowRunId());
+    Assert.assertEquals(expectedFlowRunDoc.getFlowVersion(),
+        actualFlowRunDoc.getFlowVersion());
+    Assert.assertEquals(expectedFlowRunDoc.getId(), actualFlowRunDoc.getId());
+    Assert.assertEquals(expectedFlowRunDoc.getUsername(),
+        actualFlowRunDoc.getUsername());
+    Assert.assertEquals(expectedFlowRunDoc.getType(),
+        actualFlowRunDoc.getType());
+    Assert.assertEquals(expectedFlowRunDoc.getMinStartTime(),
+        actualFlowRunDoc.getMinStartTime());
+    Assert.assertEquals(expectedFlowRunDoc.getMaxEndTime(),
+        actualFlowRunDoc.getMaxEndTime());
+    Assert.assertEquals(expectedFlowRunDoc.getMetrics().size(),
+        actualFlowRunDoc.getMetrics().size());
+
+    actualFlowRunDoc.merge(expectedFlowRunDoc);
+
+    Assert.assertEquals(value + value, actualFlowRunDoc.getMetrics()
+        .get(MEMORY_ID).getSingleDataValue());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DummyDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DummyDocumentStoreReader.java
new file mode 100755
index 0000000..1317392
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DummyDocumentStoreReader.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dummy Document Store Reader for mocking backend calls for unit test.
+ */
+public class DummyDocumentStoreReader<TimelineDoc extends TimelineDocument>
+    implements DocumentStoreReader<TimelineDoc> {
+
+  private final TimelineEntityDocument entityDoc;
+  private final List<TimelineEntityDocument> entityDocs;
+  private final FlowRunDocument flowRunDoc;
+  private final FlowActivityDocument flowActivityDoc;
+
+  public DummyDocumentStoreReader() {
+    try {
+      entityDoc = DocumentStoreTestUtils.bakeTimelineEntityDoc();
+      entityDocs = DocumentStoreTestUtils.bakeYarnAppTimelineEntities();
+      flowRunDoc = DocumentStoreTestUtils.bakeFlowRunDoc();
+      flowActivityDoc = DocumentStoreTestUtils.bakeFlowActivityDoc();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create " +
+          "DummyDocumentStoreReader : ", e);
+    }
+  }
+
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public TimelineDoc readDocument(String collectionName, TimelineReaderContext
+      context, Class<TimelineDoc> docClass) {
+    switch (TimelineEntityType.valueOf(context.getEntityType())) {
+    case YARN_FLOW_ACTIVITY:
+      return (TimelineDoc) flowActivityDoc;
+    case YARN_FLOW_RUN:
+      return (TimelineDoc) flowRunDoc;
+    default:
+      return (TimelineDoc) entityDoc;
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public List<TimelineDoc> readDocumentList(String collectionName,
+      TimelineReaderContext context, Class<TimelineDoc> docClass, long size) {
+
+    switch (TimelineEntityType.valueOf(context.getEntityType())) {
+    case YARN_FLOW_ACTIVITY:
+      List<FlowActivityDocument> flowActivityDocs = new ArrayList<>();
+      flowActivityDocs.add(flowActivityDoc);
+      if (size > flowActivityDocs.size()) {
+        size = flowActivityDocs.size();
+      }
+      return (List<TimelineDoc>) flowActivityDocs.subList(0, (int) size);
+    case YARN_FLOW_RUN:
+      List<FlowRunDocument> flowRunDocs = new ArrayList<>();
+      flowRunDocs.add(flowRunDoc);
+      if (size > flowRunDocs.size()) {
+        size = flowRunDocs.size();
+      }
+      return (List<TimelineDoc>) flowRunDocs.subList(0, (int) size);
+    case YARN_APPLICATION:
+      List<TimelineEntityDocument> applicationEntities = new ArrayList<>();
+      applicationEntities.add(entityDoc);
+      if (size > applicationEntities.size()) {
+        size = applicationEntities.size();
+      }
+      return (List<TimelineDoc>) applicationEntities.subList(0, (int) size);
+    default:
+      if (size > entityDocs.size() || size == -1) {
+        size = entityDocs.size();
+      }
+      return (List<TimelineDoc>) entityDocs.subList(0, (int) size);
+    }
+  }
+
+  @Override
+  public Set<String> fetchEntityTypes(String collectionName,
+      TimelineReaderContext context) {
+    return entityDocs.stream().map(TimelineEntityDocument::getType)
+        .collect(Collectors.toSet());
+  }
+
+  @Override
+  public void close() {
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DummyDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DummyDocumentStoreWriter.java
new file mode 100755
index 0000000..764437a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DummyDocumentStoreWriter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+
+/**
+ * Dummy Document Store Writer for mocking backend calls for unit test.
+ */
+public class DummyDocumentStoreWriter<Document extends TimelineDocument>
+    implements DocumentStoreWriter<Document> {
+
+  @Override
+  public void createDatabase() {
+  }
+
+  @Override
+  public void createCollection(String collectionName) {
+  }
+
+  @Override
+  public void writeDocument(Document timelineDocument,
+      CollectionType collectionType) {
+  }
+
+  @Override
+  public void close() {
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowactivity-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowactivity-doc.json
new file mode 100755
index 0000000..aa0261d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowactivity-doc.json
@@ -0,0 +1,20 @@
+{
+  "id": "yarn_cluster!1533859200000!test_user!DistributedShell",
+  "type": "YARN_FLOW_ACTIVITY",
+  "flowActivities": [
+    {
+      "flowName": "DistributedShell",
+      "flowVersion": "1",
+      "flowRunId": 1533871039026
+    },
+    {
+      "flowName": "DistributedShell",
+      "flowVersion": "1",
+      "flowRunId": 1533871599510
+    }
+  ],
+  "dayTimestamp": 1533859200000,
+  "user": "test_user",
+  "flowName": "DistributedShell",
+  "createdTime": 1533859200000000
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowrun-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowrun-doc.json
new file mode 100755
index 0000000..79be6b1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowrun-doc.json
@@ -0,0 +1,126 @@
+{
+  "id": "yarn_cluster!test_user!DistributedShell!1533871599510",
+  "type": "YARN_FLOW_RUN",
+  "clusterId": "yarn_cluster",
+  "username": "test_user",
+  "flowName": "DistributedShell",
+  "flowRunId": 1533871599510,
+  "flowVersion": "1",
+  "minStartTime": 1533871599510,
+  "maxEndTime": 1533871614645,
+  "metrics": {
+    "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 0,
+      "id": "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 0
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 0
+      }
+    },
+    "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 0,
+      "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 0
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 0
+      }
+    },
+    "YARN_APPLICATION_CPU_PREEMPT_METRIC": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 0,
+      "id": "YARN_APPLICATION_CPU_PREEMPT_METRIC",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 0
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 0
+      }
+    },
+    "YARN_APPLICATION_MEMORY": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 19858,
+      "id": "YARN_APPLICATION_MEMORY",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 19858
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 19858
+      }
+    },
+    "YARN_APPLICATION_MEM_PREEMPT_METRIC": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 0,
+      "id": "YARN_APPLICATION_MEM_PREEMPT_METRIC",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 0
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 0
+      }
+    },
+    "YARN_APPLICATION_CPU": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 19,
+      "id": "YARN_APPLICATION_CPU",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 19
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 19
+      }
+    },
+    "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 0,
+      "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 0
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 0
+      }
+    },
+    "YARN_APPLICATION_AM_CONTAINER_PREEMPTED": {
+      "valid": true,
+      "singleDataTimestamp": 1533871614645,
+      "singleDataValue": 0,
+      "id": "YARN_APPLICATION_AM_CONTAINER_PREEMPTED",
+      "type": "SINGLE_VALUE",
+      "values": {
+        "1533871614645": 0
+      },
+      "realtimeAggregationOp": "NOP",
+      "valuesJAXB": {
+        "1533871614645": 0
+      }
+    }
+  },
+  "createdTime": 1533871599510
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/test-timeline-entities-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/test-timeline-entities-doc.json
new file mode 100755
index 0000000..c2cd8c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/test-timeline-entities-doc.json
@@ -0,0 +1,185 @@
+[
+  {
+    "context": {
+      "clusterId": "yarn_cluster",
+      "userId": "test_user",
+      "flowName": "DistributedShell",
+      "flowRunId": 1533985547564,
+      "appId": "application_1533985489663_0001"
+    },
+    "flowVersion": "1",
+    "subApplicationUser": "test_user",
+    "metrics": {},
+    "events": {
+      "YARN_RM_CONTAINER_REQUESTED_TYPE": [
+        {
+          "valid": true,
+          "id": "YARN_RM_CONTAINER_REQUESTED_TYPE",
+          "timestamp": 1533985547824,
+          "info": {
+            "YARN_RM_CONTAINER_ALLOCATION_REQUEST_ID": 0
+          }
+        }
+      ],
+      "YARN_APPLICATION_ATTEMPT_FINISHED": [
+        {
+          "valid": true,
+          "id": "YARN_APPLICATION_ATTEMPT_FINISHED",
+          "timestamp": 1533985561254,
+          "info": {}
+        }
+      ],
+      "YARN_APPLICATION_ATTEMPT_REGISTERED": [
+        {
+          "valid": true,
+          "id": "YARN_APPLICATION_ATTEMPT_REGISTERED",
+          "timestamp": 1533985554927,
+          "info": {}
+        }
+      ]
+    },
+    "id": "yarn_cluster!test_user!DistributedShell!1533985547564!application_1533985489663_0001!YARN_APPLICATION_ATTEMPT!appattempt_1533985489663_0001_000001",
+    "type": "YARN_APPLICATION_ATTEMPT",
+    "configs": {},
+    "info": {
+      "SYSTEM_INFO_PARENT_ENTITY": {
+        "type": "YARN_APPLICATION",
+        "id": "application_1533985489663_0001"
+      },
+      "YARN_APPLICATION_ATTEMPT_HOST": "test_user/10.171.19.25",
+      "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER": "container_1533985489663_0001_01_000001",
+      "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL": "N/A",
+      "YARN_APPLICATION_ATTEMPT_RPC_PORT": -1,
+      "YARN_APPLICATION_ATTEMPT_TRACKING_URL": "http://test_user:8088/proxy/application_1533985489663_0001/",
+      "YARN_APPLICATION_ATTEMPT_DIAGNOSTICS_INFO": "",
+      "YARN_APPLICATION_ATTEMPT_STATE": "FINISHED",
+      "YARN_APPLICATION_ATTEMPT_FINAL_STATUS": "SUCCEEDED"
+    },
+    "createdTime": 1533985554927,
+    "relatesToEntities": {},
+    "isRelatedToEntities": {}
+  },
+  {
+    "context": {
+      "clusterId": "yarn_cluster",
+      "userId": "test_user",
+      "flowName": "DistributedShell",
+      "flowRunId": 1533985547564,
+      "appId": "application_1533985489663_0001"
+    },
+    "flowVersion": "1",
+    "subApplicationUser": "test_user",
+    "metrics": {
+      "MEMORY": [
+        {
+          "valid": true,
+          "singleDataTimestamp": 1533985556335,
+          "singleDataValue": 150298624,
+          "id": "MEMORY",
+          "type": "SINGLE_VALUE",
+          "realtimeAggregationOp": "SUM",
+          "values": {
+            "1533985556335": 150298624
+          },
+          "valuesJAXB": {
+            "1533985556335": 150298624
+          }
+        }
+      ]
+    },
+    "events": {
+      "YARN_RM_CONTAINER_CREATED": [
+        {
+          "valid": true,
+          "id": "YARN_RM_CONTAINER_CREATED",
+          "timestamp": 1533985548047,
+          "info": {
+            "YARN_RM_CONTAINER_ALLOCATION_REQUEST_ID": 0
+          }
+        }
+      ],
+      "YARN_CONTAINER_CREATED": [
+        {
+          "valid": true,
+          "id": "YARN_CONTAINER_CREATED",
+          "timestamp": 1533985549474,
+          "info": {
+            "YARN_NM_EVENT_SOURCE": "CONTAINER_EVENT",
+            "YARN_CONTAINER_STATE": "NEW",
+            "YARN_APPLICATION_STATE": "RUNNING"
+          }
+        }
+      ],
+      "YARN_CONTAINER_FINISHED": [
+        {
+          "valid": true,
+          "id": "YARN_CONTAINER_FINISHED",
+          "timestamp": 1533985560616,
+          "info": {
+            "YARN_NM_EVENT_SOURCE": "CONTAINER_EVENT"
+          }
+        }
+      ]
+    },
+    "id": "yarn_cluster!test_user!DistributedShell!1533985547564!application_1533985489663_0001!YARN_CONTAINER!container_1533985489663_0001_01_000001",
+    "type": "YARN_CONTAINER",
+    "configs": {},
+    "info": {
+      "YARN_CONTAINER_ALLOCATED_PORT": 13076,
+      "YARN_CONTAINER_ALLOCATED_MEMORY": 1024,
+      "YARN_CONTAINER_ALLOCATED_PRIORITY": "0",
+      "YARN_CONTAINER_ALLOCATED_HOST": "test_user",
+      "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS": "test_user:8042",
+      "YARN_CONTAINER_ALLOCATED_VCORE": 1,
+      "SYSTEM_INFO_PARENT_ENTITY": {
+        "type": "YARN_APPLICATION_ATTEMPT",
+        "id": "appattempt_1533985489663_0001_000001"
+      },
+      "YARN_CONTAINER_STATE": "COMPLETE",
+      "YARN_CONTAINER_EXIT_STATUS": 0,
+      "YARN_CONTAINER_DIAGNOSTICS_INFO": "",
+      "YARN_CONTAINER_FINISHED_TIME": 1533985560616
+    },
+    "createdTime": 1533985549474,
+    "relatesToEntities": {},
+    "isRelatedToEntities": {}
+  },
+  {
+    "context": {
+      "clusterId": "yarn_cluster",
+      "userId": "test_user",
+      "flowName": "DistributedShell",
+      "flowRunId": 1533985547564,
+      "appId": "application_1533985489663_0001"
+    },
+    "flowVersion": "1",
+    "subApplicationUser": "test_user",
+    "metrics": {},
+    "events": {
+      "CONTAINER_LAUNCHED": [
+        {
+          "valid": true,
+          "id": "CONTAINER_LAUNCHED",
+          "timestamp": 1533985557747,
+          "info": {
+            "YARN_NM_EVENT_SOURCE": "CONTAINER_EVENT",
+            "YARN_CONTAINER_STATE": "LOCALIZED",
+            "YARN_APPLICATION_STATE": "RUNNING"
+          }
+        }
+      ]
+    },
+    "id": "yarn_cluster!test_user!DistributedShell!1533985547564!application_1533985489663_0001!YARN_CONTAINER!container_1533985489663_0001_01_000002",
+    "type": "YARN_CONTAINER",
+    "configs": {},
+    "info": {
+      "SYSTEM_INFO_PARENT_ENTITY": {
+        "type": "YARN_APPLICATION_ATTEMPT",
+        "id": "appattempt_1533985489663_0001_000001"
+      }
+    },
+    "createdTime": 0,
+    "relatesToEntities": {},
+    "isRelatedToEntities": {}
+  }
+]
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-app-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-app-doc.json
new file mode 100755
index 0000000..510ed47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-app-doc.json
@@ -0,0 +1,203 @@
+{
+  "context": {
+    "clusterId": "yarn_cluster",
+    "userId": "test_user",
+    "flowName": "DistributedShell",
+    "flowRunId": 1533871599510,
+    "appId": "application_1533871545292_0001"
+  },
+  "flowVersion": "1",
+  "subApplicationUser": "test_user",
+  "metrics": {
+    "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 0,
+        "id": "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 0
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 0
+        }
+      }
+    ],
+    "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 0,
+        "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 0
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 0
+        }
+      }
+    ],
+    "YARN_APPLICATION_CPU_PREEMPT_METRIC": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 0,
+        "id": "YARN_APPLICATION_CPU_PREEMPT_METRIC",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 0
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 0
+        }
+      }
+    ],
+    "YARN_APPLICATION_MEMORY": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 19858,
+        "id": "YARN_APPLICATION_MEMORY",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 19858
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 19858
+        }
+      }
+    ],
+    "YARN_APPLICATION_MEM_PREEMPT_METRIC": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 0,
+        "id": "YARN_APPLICATION_MEM_PREEMPT_METRIC",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 0
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 0
+        }
+      }
+    ],
+    "YARN_APPLICATION_CPU": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 19,
+        "id": "YARN_APPLICATION_CPU",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 19
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 19
+        }
+      }
+    ],
+    "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 0,
+        "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 0
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 0
+        }
+      }
+    ],
+    "YARN_APPLICATION_AM_CONTAINER_PREEMPTED": [
+      {
+        "valid": true,
+        "singleDataTimestamp": 1533871614645,
+        "singleDataValue": 0,
+        "id": "YARN_APPLICATION_AM_CONTAINER_PREEMPTED",
+        "type": "SINGLE_VALUE",
+        "values": {
+          "1533871614645": 0
+        },
+        "realtimeAggregationOp": "NOP",
+        "valuesJAXB": {
+          "1533871614645": 0
+        }
+      }
+    ]
+  },
+  "events": {
+    "YARN_APPLICATION_CREATED": [
+      {
+        "valid": true,
+        "id": "YARN_APPLICATION_CREATED",
+        "timestamp": 1533871599510,
+        "info": {}
+      }
+    ],
+    "YARN_APPLICATION_ACLS_UPDATED": [
+      {
+        "valid": true,
+        "id": "YARN_APPLICATION_ACLS_UPDATED",
+        "timestamp": 1533871599671,
+        "info": {}
+      }
+    ],
+    "YARN_APPLICATION_STATE_UPDATED": [
+      {
+        "valid": true,
+        "id": "YARN_APPLICATION_STATE_UPDATED",
+        "timestamp": 1533871608094,
+        "info": {
+          "YARN_APPLICATION_STATE": "RUNNING"
+        }
+      }
+    ],
+    "YARN_APPLICATION_FINISHED": [
+      {
+        "valid": true,
+        "id": "YARN_APPLICATION_FINISHED",
+        "timestamp": 1533871614645,
+        "info": {}
+      }
+    ]
+  },
+  "id": "yarn_cluster!test_user!DistributedShell!1533871599510!application_1533871545292_0001!YARN_APPLICATION!application_1533871545292_0001",
+  "type": "YARN_APPLICATION",
+  "createdTime": 1533871599510,
+  "info": {
+    "YARN_APPLICATION_VIEW_ACLS": "",
+    "YARN_APPLICATION_SUBMITTED_TIME": 1533871599446,
+    "YARN_AM_CONTAINER_LAUNCH_COMMAND": [
+      "{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr "
+    ],
+    "YARN_APPLICATION_NAME": "DistributedShell",
+    "YARN_APPLICATION_USER": "test_user",
+    "YARN_APPLICATION_QUEUE": "default",
+    "YARN_APPLICATION_TYPE": "YARN",
+    "YARN_APPLICATION_UNMANAGED_APPLICATION": false,
+    "YARN_APPLICATION_TAGS": [],
+    "YARN_APPLICATION_STATE": "FINISHED",
+    "YARN_APPLICATION_DIAGNOSTICS_INFO": "",
+    "YARN_APPLICATION_FINAL_STATUS": "SUCCEEDED",
+    "YARN_APPLICATION_LATEST_APP_ATTEMPT": "appattempt_1533871545292_0001_000001"
+  },
+  "configs": {
+    "YARN_AM_NODE_LABEL_EXPRESSION": "<DEFAULT_PARTITION>",
+    "YARN_APP_NODE_LABEL_EXPRESSION": "<Not set>"
+  },
+  "relatesToEntities": {},
+  "isRelatedToEntities": {}
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-entities.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-entities.json
new file mode 100755
index 0000000..9980c64
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-entities.json
@@ -0,0 +1,119 @@
+[
+  {
+    "identifier": {
+      "type": "YARN_APPLICATION",
+      "id": "application_1532614298307_0002"
+    },
+    "info": {
+      "YARN_APPLICATION_VIEW_ACLS": ""
+    },
+    "configs": {},
+    "metrics": [],
+    "events": [
+      {
+        "id": "YARN_APPLICATION_ACLS_UPDATED",
+        "info": {},
+        "timestamp": 1532614542444,
+        "valid": true,
+        "infoJAXB": {}
+      }
+    ],
+    "id": "application_1532614298307_0002",
+    "type": "YARN_APPLICATION",
+    "valid": true,
+    "configsJAXB": {},
+    "infoJAXB": {
+      "YARN_APPLICATION_VIEW_ACLS": ""
+    },
+    "relatesToEntitiesJAXB": {},
+    "isRelatedToEntitiesJAXB": {},
+    "isrelatedto": {},
+    "relatesto": {},
+    "createdtime": null,
+    "idprefix": 0
+  },
+  {
+    "identifier": {
+      "type": "YARN_APPLICATION_ATTEMPT",
+      "id": "appattempt_1532614298307_0002_000001"
+    },
+    "info": {
+      "YARN_APPLICATION_ATTEMPT_HOST": "test_user/10.171.19.36",
+      "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER": "container_1532614298307_0002_01_000001",
+      "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL": "N/A",
+      "YARN_APPLICATION_ATTEMPT_RPC_PORT": -1,
+      "YARN_APPLICATION_ATTEMPT_TRACKING_URL": "http://test_user:8088/proxy/application_1532614298307_0002/"
+    },
+    "configs": {},
+    "metrics": [],
+    "events": [
+      {
+        "id": "YARN_APPLICATION_ATTEMPT_REGISTERED",
+        "info": {},
+        "timestamp": 1532614551262,
+        "valid": true,
+        "infoJAXB": {}
+      }
+    ],
+    "id": "appattempt_1532614298307_0002_000001",
+    "type": "YARN_APPLICATION_ATTEMPT",
+    "valid": true,
+    "configsJAXB": {},
+    "infoJAXB": {
+      "YARN_APPLICATION_ATTEMPT_HOST": "test_user/10.171.19.36",
+      "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER": "container_1532614298307_0002_01_000001",
+      "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL": "N/A",
+      "YARN_APPLICATION_ATTEMPT_RPC_PORT": -1,
+      "YARN_APPLICATION_ATTEMPT_TRACKING_URL": "http://test_user:8088/proxy/application_1532614298307_0002/"
+    },
+    "relatesToEntitiesJAXB": {},
+    "isRelatedToEntitiesJAXB": {},
+    "isrelatedto": {},
+    "relatesto": {},
+    "createdtime": 1532614551262,
+    "idprefix": 0
+  },
+  {
+    "identifier": {
+      "type": "YARN_CONTAINER",
+      "id": "container_1532614298307_0002_01_000001"
+    },
+    "info": {
+      "YARN_CONTAINER_ALLOCATED_PORT": 2032,
+      "YARN_CONTAINER_ALLOCATED_MEMORY": 1024,
+      "YARN_CONTAINER_ALLOCATED_PRIORITY": 0,
+      "YARN_CONTAINER_ALLOCATED_HOST": "test_user",
+      "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS": "http://test_user:8042",
+      "YARN_CONTAINER_ALLOCATED_VCORE": 1
+    },
+    "configs": {},
+    "metrics": [],
+    "events": [
+      {
+        "id": "YARN_RM_CONTAINER_CREATED",
+        "info": {},
+        "timestamp": 1532614543389,
+        "valid": true,
+        "infoJAXB": {}
+      }
+    ],
+    "id": "container_1532614298307_0002_01_000001",
+    "type": "YARN_CONTAINER",
+    "valid": true,
+    "configsJAXB": {},
+    "infoJAXB": {
+      "YARN_CONTAINER_ALLOCATED_PORT": 2032,
+      "YARN_CONTAINER_ALLOCATED_MEMORY": 1024,
+      "YARN_CONTAINER_ALLOCATED_PRIORITY": 0,
+      "YARN_CONTAINER_ALLOCATED_HOST": "test_user",
+      "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS": "http://test_user:8042",
+      "YARN_CONTAINER_ALLOCATED_VCORE": 1
+    },
+    "relatesToEntitiesJAXB": {},
+    "isRelatedToEntitiesJAXB": {},
+    "isrelatedto": {},
+    "relatesto": {},
+    "createdtime": 1532614543389,
+    "idprefix": 0
+  }
+]
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java
index 0da9258..024c61c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java
@@ -62,7 +62,7 @@ final public class PerNodeAggTimelineCollectorMetrics {
       synchronized (PerNodeAggTimelineCollectorMetrics.class) {
         if (instance == null) {
           instance =
-              DefaultMetricsSystem.initialize("TimelineService").register(
+              DefaultMetricsSystem.instance().register(
                   METRICS_INFO.name(), METRICS_INFO.description(),
                   new PerNodeAggTimelineCollectorMetrics());
           isInitialized.set(true);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index 2f51023..0855105 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -46,5 +46,6 @@
     <module>hadoop-yarn-server-timelineservice-hbase</module>
     <module>hadoop-yarn-server-timelineservice-hbase-tests</module>
     <module>hadoop-yarn-server-router</module>
+    <module>hadoop-yarn-server-timelineservice-documentstore</module>
   </modules>
 </project>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org