You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/03/13 23:45:31 UTC
[hadoop] branch trunk updated: YARN-9016 DocumentStore as a backend
for ATSv2. Contributed by Sushil Ks.
This is an automated email from the ASF dual-hosted git repository.
vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f235a94 YARN-9016 DocumentStore as a backend for ATSv2. Contributed by Sushil Ks.
f235a94 is described below
commit f235a942d5b7cab86f0cb4f5ba285f07cd939a40
Author: Vrushali C <vr...@apache.org>
AuthorDate: Wed Mar 13 16:45:23 2019 -0700
YARN-9016 DocumentStore as a backend for ATSv2. Contributed by Sushil Ks.
---
.../main/resources/assemblies/hadoop-yarn-dist.xml | 5 +
.../pom.xml | 150 +++++++
.../DocumentStoreCollectionCreator.java | 66 +++
.../DocumentStoreTimelineReaderImpl.java | 121 +++++
.../DocumentStoreTimelineWriterImpl.java | 285 ++++++++++++
.../documentstore/DocumentStoreUtils.java | 489 +++++++++++++++++++++
.../documentstore/collection/CollectionType.java | 44 ++
.../document/NoDocumentFoundException.java | 39 ++
.../collection/document/TimelineDocument.java | 37 ++
.../document/entity/TimelineEntityDocument.java | 242 ++++++++++
.../document/entity/TimelineEventSubDoc.java | 96 ++++
.../document/entity/TimelineMetricSubDoc.java | 167 +++++++
.../collection/document/entity/package-info.java | 30 ++
.../flowactivity/FlowActivityDocument.java | 131 ++++++
.../document/flowactivity/FlowActivitySubDoc.java | 73 +++
.../document/flowactivity/package-info.java | 29 ++
.../document/flowrun/FlowRunDocument.java | 239 ++++++++++
.../collection/document/flowrun/package-info.java | 29 ++
.../collection/document/package-info.java | 30 ++
.../documentstore/collection/package-info.java | 30 ++
.../documentstore/lib/DocumentStoreFactory.java | 96 ++++
.../lib/DocumentStoreNotSupportedException.java | 35 ++
.../documentstore/lib/DocumentStoreVendor.java | 39 ++
.../documentstore/lib/package-info.java | 30 ++
.../documentstore/package-info.java | 29 ++
.../documentstore/reader/DocumentStoreReader.java | 45 ++
.../reader/TimelineCollectionReader.java | 220 +++++++++
.../cosmosdb/CosmosDBDocumentStoreReader.java | 232 ++++++++++
.../reader/cosmosdb/package-info.java | 28 ++
.../documentstore/reader/package-info.java | 29 ++
.../documentstore/writer/DocumentStoreWriter.java | 35 ++
.../writer/TimelineCollectionWriter.java | 146 ++++++
.../cosmosdb/CosmosDBDocumentStoreWriter.java | 235 ++++++++++
.../writer/cosmosdb/package-info.java | 28 ++
.../documentstore/writer/package-info.java | 29 ++
.../documentstore/DocumentStoreTestUtils.java | 81 ++++
.../timelineservice/documentstore/JsonUtils.java | 59 +++
.../TestDocumentStoreCollectionCreator.java | 64 +++
.../TestDocumentStoreTimelineReaderImpl.java | 407 +++++++++++++++++
.../TestDocumentStoreTimelineWriterImpl.java | 90 ++++
.../collection/TestDocumentOperations.java | 177 ++++++++
.../reader/DummyDocumentStoreReader.java | 118 +++++
.../writer/DummyDocumentStoreWriter.java | 46 ++
.../test/resources/documents/flowactivity-doc.json | 20 +
.../src/test/resources/documents/flowrun-doc.json | 126 ++++++
.../documents/test-timeline-entities-doc.json | 185 ++++++++
.../test/resources/documents/timeline-app-doc.json | 203 +++++++++
.../resources/documents/timeline-entities.json | 119 +++++
.../PerNodeAggTimelineCollectorMetrics.java | 2 +-
.../hadoop-yarn/hadoop-yarn-server/pom.xml | 1 +
50 files changed, 5285 insertions(+), 1 deletion(-)
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
index a5c3c0e..4da4ac5 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml
@@ -224,6 +224,10 @@
<directory>hadoop-yarn/hadoop-yarn-csi/target/lib</directory>
<outputDirectory>share/hadoop/${hadoop.component}/csi/lib</outputDirectory>
</fileSet>
+ <fileSet>
+ <directory>hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/target/lib</directory>
+ <outputDirectory>share/hadoop/${hadoop.component}/timelineservice/lib</outputDirectory>
+ </fileSet>
</fileSets>
<moduleSets>
<moduleSet>
@@ -231,6 +235,7 @@
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-client</include>
<include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include>
+ <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-documentstore</include>
</includes>
<binaries>
<outputDirectory>share/hadoop/${hadoop.component}/timelineservice</outputDirectory>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
new file mode 100644
index 0000000..be668e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/pom.xml
@@ -0,0 +1,150 @@
+<?xml version="1.0"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hadoop-yarn-server</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>3.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hadoop-yarn-server-timelineservice-documentstore</artifactId>
+ <name>Apache Hadoop YARN TimelineService DocumentStore</name>
+
+ <properties>
+ <!-- Needed for generating FindBugs warnings using parent pom -->
+ <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+ <azure.documentdb.version>1.16.2</azure.documentdb.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-documentdb</artifactId>
+ <version>${azure.documentdb.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.8.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.7.1</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Apache RAT -->
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/test/resources/documents/flowactivity-doc.json</exclude>
+ <exclude>src/test/resources/documents/flowrun-doc.json</exclude>
+ <exclude>src/test/resources/documents/test-timeline-entities-doc.json</exclude>
+ <exclude>src/test/resources/documents/timeline-app-doc.json</exclude>
+ <exclude>src/test/resources/documents/timeline-entities.json</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionalDependencies>
+ <additionnalDependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ </additionnalDependency>
+ </additionalDependencies>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeScope>runtime</includeScope>
+ <excludeGroupIds>org.slf4j,org.apache.hadoop,com.github.stephenc.findbugs</excludeGroupIds>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreCollectionCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreCollectionCreator.java
new file mode 100755
index 0000000..15d1e3e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreCollectionCreator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.SchemaCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This creates the Collection for a {@link DocumentStoreVendor} backend
+ * configured for storing application timeline information.
+ */
+public class DocumentStoreCollectionCreator implements SchemaCreator {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DocumentStoreCollectionCreator.class);
+
+
+ @Override
+ public void createTimelineSchema(String[] args) {
+ try {
+
+ Configuration conf = new YarnConfiguration();
+
+ LOG.info("Creating database and collections for DocumentStore : {}",
+ DocumentStoreUtils.getStoreVendor(conf));
+
+ try(DocumentStoreWriter documentStoreWriter = DocumentStoreFactory
+ .createDocumentStoreWriter(conf)) {
+ documentStoreWriter.createDatabase();
+ documentStoreWriter.createCollection(
+ CollectionType.APPLICATION.getCollectionName());
+ documentStoreWriter.createCollection(
+ CollectionType.ENTITY.getCollectionName());
+ documentStoreWriter.createCollection(
+ CollectionType.FLOW_ACTIVITY.getCollectionName());
+ documentStoreWriter.createCollection(
+ CollectionType.FLOW_RUN.getCollectionName());
+ }
+ } catch (Exception e) {
+ LOG.error("Error while creating Timeline Collections", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java
new file mode 100755
index 0000000..2159132
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.TimelineCollectionReader;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is a generic document store timeline reader for reading the timeline
+ * entity information. Based on the {@link DocumentStoreVendor} that is
+ * configured, the documents are read from that backend.
+ */
+public class DocumentStoreTimelineReaderImpl
+ extends AbstractService implements TimelineReader {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DocumentStoreTimelineReaderImpl.class);
+
+ private TimelineCollectionReader collectionReader;
+
+ public DocumentStoreTimelineReaderImpl() {
+ super(DocumentStoreTimelineReaderImpl.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ DocumentStoreVendor storeType = DocumentStoreUtils.getStoreVendor(conf);
+ LOG.info("Initializing Document Store Reader for : " + storeType);
+ collectionReader = new TimelineCollectionReader(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ LOG.info("Stopping Document Timeline Store reader...");
+ collectionReader.close();
+ }
+
+ public TimelineEntity getEntity(TimelineReaderContext context,
+ TimelineDataToRetrieve dataToRetrieve) throws IOException {
+ TimelineEntityDocument timelineEntityDoc;
+ switch (TimelineEntityType.valueOf(context.getEntityType())) {
+ case YARN_FLOW_ACTIVITY:
+ case YARN_FLOW_RUN:
+ timelineEntityDoc =
+ collectionReader.readDocument(context);
+ return DocumentStoreUtils.createEntityToBeReturned(
+ timelineEntityDoc, dataToRetrieve.getConfsToRetrieve(),
+ dataToRetrieve.getMetricsToRetrieve());
+ default:
+ timelineEntityDoc =
+ collectionReader.readDocument(context);
+ }
+ return DocumentStoreUtils.createEntityToBeReturned(
+ timelineEntityDoc, dataToRetrieve);
+ }
+
+ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
+ TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
+ throws IOException {
+ List<TimelineEntityDocument> entityDocs =
+ collectionReader.readDocuments(context, filters.getLimit());
+
+ return applyFilters(filters, dataToRetrieve, entityDocs);
+ }
+
+ public Set<String> getEntityTypes(TimelineReaderContext context) {
+ return collectionReader.fetchEntityTypes(context);
+ }
+
+ // for honoring all filters from {@link TimelineEntityFilters}
+ private Set<TimelineEntity> applyFilters(TimelineEntityFilters filters,
+ TimelineDataToRetrieve dataToRetrieve,
+ List<TimelineEntityDocument> entityDocs) throws IOException {
+ Set<TimelineEntity> timelineEntities = new HashSet<>();
+ for (TimelineEntityDocument entityDoc : entityDocs) {
+ final TimelineEntity timelineEntity = entityDoc.fetchTimelineEntity();
+
+ if (DocumentStoreUtils.isFilterNotMatching(filters, timelineEntity)) {
+ continue;
+ }
+
+ TimelineEntity entityToBeReturned = DocumentStoreUtils
+ .createEntityToBeReturned(entityDoc, dataToRetrieve);
+ timelineEntities.add(entityToBeReturned);
+ }
+ return timelineEntities;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java
new file mode 100755
index 0000000..572d888
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.*;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.TimelineCollectionWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ * This is a generic document store timeline writer for storing the timeline
+ * entity information. Based on the {@link DocumentStoreVendor} that is
+ * configured, the documents are written to that backend.
+ */
+public class DocumentStoreTimelineWriterImpl extends AbstractService
+ implements TimelineWriter {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DocumentStoreTimelineWriterImpl.class);
+ private static final String DOC_ID_DELIMITER = "!";
+
+ private DocumentStoreVendor storeType;
+ private TimelineCollectionWriter<TimelineEntityDocument> appCollWriter;
+ private TimelineCollectionWriter<TimelineEntityDocument>
+ entityCollWriter;
+ private TimelineCollectionWriter<FlowActivityDocument> flowActivityCollWriter;
+ private TimelineCollectionWriter<FlowRunDocument> flowRunCollWriter;
+
+
+ public DocumentStoreTimelineWriterImpl() {
+ super(DocumentStoreTimelineWriterImpl.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ storeType = DocumentStoreUtils.getStoreVendor(conf);
+ LOG.info("Initializing Document Store Writer for : " + storeType);
+ super.serviceInit(conf);
+
+ this.appCollWriter = new TimelineCollectionWriter<>(
+ CollectionType.APPLICATION, conf);
+ this.entityCollWriter = new TimelineCollectionWriter<>(
+ CollectionType.ENTITY, conf);
+ this.flowActivityCollWriter = new TimelineCollectionWriter<>(
+ CollectionType.FLOW_ACTIVITY, conf);
+ this.flowRunCollWriter = new TimelineCollectionWriter<>(
+ CollectionType.FLOW_RUN, conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ appCollWriter.close();
+ entityCollWriter.close();
+ flowActivityCollWriter.close();
+ flowRunCollWriter.close();
+ }
+
+ @Override
+ public TimelineWriteResponse write(TimelineCollectorContext
+ context, TimelineEntities data, UserGroupInformation callerUgi) {
+ LOG.debug("Writing Timeline Entity for appID : {}", context.getAppId());
+ TimelineWriteResponse putStatus = new TimelineWriteResponse();
+ String subApplicationUser = callerUgi.getShortUserName();
+
+ //Avoiding NPE for document id
+ if (DocumentStoreUtils.isNullOrEmpty(context.getFlowName(),
+ context.getAppId(), context.getClusterId(), context.getUserId())) {
+ LOG.warn("Found NULL for one of: flowName={} appId={} " +
+ "userId={} clusterId={} . Not proceeding on writing to store : " +
+ storeType);
+ return putStatus;
+ }
+
+ for (TimelineEntity timelineEntity : data.getEntities()) {
+ // a set can have at most 1 null
+ if(timelineEntity == null) {
+ continue;
+ }
+
+ TimelineEntityDocument entityDocument;
+ //If the entity is application, it will be stored in Application
+ // Collection
+ if (ApplicationEntity.isApplicationEntity(timelineEntity)) {
+ entityDocument = createTimelineEntityDoc(context, subApplicationUser,
+ timelineEntity, true);
+ // if it's an application entity, store metrics for aggregation
+ FlowRunDocument flowRunDoc = createFlowRunDoc(context,
+ timelineEntity.getMetrics());
+ // fetch flow activity if App is created or finished
+ FlowActivityDocument flowActivityDoc = getFlowActivityDoc(context,
+ timelineEntity, flowRunDoc, entityDocument);
+ writeApplicationDoc(entityDocument);
+ writeFlowRunDoc(flowRunDoc);
+ if(flowActivityDoc != null) {
+ storeFlowActivityDoc(flowActivityDoc);
+ }
+ } else {
+ entityDocument = createTimelineEntityDoc(context, subApplicationUser,
+ timelineEntity, false);
+ appendSubAppUserIfExists(context, subApplicationUser);
+ // The entity will be stored in Entity Collection
+ entityDocument.setCreatedTime(fetchEntityCreationTime(timelineEntity));
+ writeEntityDoc(entityDocument);
+ }
+ }
+ return putStatus;
+ }
+
+ @Override
+ public TimelineWriteResponse write(TimelineCollectorContext context,
+ TimelineDomain domain) throws IOException {
+ return null;
+ }
+
+ private void appendSubAppUserIfExists(TimelineCollectorContext context,
+ String subApplicationUser) {
+ String userId = context.getUserId();
+ if (!userId.equals(subApplicationUser) &&
+ !userId.contains(subApplicationUser)) {
+ userId = userId.concat(DOC_ID_DELIMITER).concat(subApplicationUser);
+ context.setUserId(userId);
+ }
+ }
+
+ private TimelineEntityDocument createTimelineEntityDoc(
+ TimelineCollectorContext context, String subApplicationUser,
+ TimelineEntity timelineEntity, boolean isAppEntity) {
+ TimelineEntityDocument entityDocument =
+ new TimelineEntityDocument(timelineEntity);
+ entityDocument.setContext(context);
+ entityDocument.setFlowVersion(context.getFlowVersion());
+ entityDocument.setSubApplicationUser(subApplicationUser);
+ if (isAppEntity) {
+ entityDocument.setId(DocumentStoreUtils.constructTimelineEntityDocId(
+ context, timelineEntity.getType()));
+ } else {
+ entityDocument.setId(DocumentStoreUtils.constructTimelineEntityDocId(
+ context, timelineEntity.getType(), timelineEntity.getId()));
+ }
+ return entityDocument;
+ }
+
+ private FlowRunDocument createFlowRunDoc(TimelineCollectorContext context,
+ Set<TimelineMetric> metrics) {
+ FlowRunDocument flowRunDoc = new FlowRunDocument(context, metrics);
+ flowRunDoc.setFlowVersion(context.getFlowVersion());
+ flowRunDoc.setId(DocumentStoreUtils.constructFlowRunDocId(context));
+ return flowRunDoc;
+ }
+
+ private long fetchEntityCreationTime(TimelineEntity timelineEntity) {
+ TimelineEvent event;
+ switch (TimelineEntityType.valueOf(timelineEntity.getType())) {
+ case YARN_CONTAINER:
+ event = DocumentStoreUtils.fetchEvent(
+ timelineEntity, ContainerMetricsConstants.CREATED_EVENT_TYPE);
+ if (event != null) {
+ return event.getTimestamp();
+ }
+ break;
+ case YARN_APPLICATION_ATTEMPT:
+ event = DocumentStoreUtils.fetchEvent(
+ timelineEntity, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+ if (event != null) {
+ return event.getTimestamp();
+ }
+ break;
+ default:
+ //NO Op
+ }
+ if (timelineEntity.getCreatedTime() == null) {
+ return 0;
+ }
+ return timelineEntity.getCreatedTime();
+ }
+
+ private FlowActivityDocument getFlowActivityDoc(
+ TimelineCollectorContext context,
+ TimelineEntity timelineEntity, FlowRunDocument flowRunDoc,
+ TimelineEntityDocument entityDocument) {
+ FlowActivityDocument flowActivityDoc = null;
+ // check if the application is created
+ TimelineEvent event = DocumentStoreUtils.fetchEvent(
+ timelineEntity, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ if (event != null) {
+ entityDocument.setCreatedTime(event.getTimestamp());
+ flowRunDoc.setMinStartTime(event.getTimestamp());
+ flowActivityDoc = createFlowActivityDoc(context, context.getFlowName(),
+ context.getFlowVersion(), context.getFlowRunId(), event);
+ }
+
+ // if application has finished, store it's finish time
+ event = DocumentStoreUtils.fetchEvent(timelineEntity,
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ if (event != null) {
+ flowRunDoc.setMaxEndTime(event.getTimestamp());
+
+ // this check is to handle in case both create and finish event exist
+ // under the single list of events for an TimelineEntity
+ if (flowActivityDoc == null) {
+ flowActivityDoc = createFlowActivityDoc(context, context.getFlowName(),
+ context.getFlowVersion(), context.getFlowRunId(), event);
+ }
+ }
+ return flowActivityDoc;
+ }
+
+ private FlowActivityDocument createFlowActivityDoc(
+ TimelineCollectorContext context, String flowName, String flowVersion,
+ long flowRunId, TimelineEvent event) {
+ FlowActivityDocument flowActivityDoc = new FlowActivityDocument(flowName,
+ flowVersion, flowRunId);
+ flowActivityDoc.setDayTimestamp(DocumentStoreUtils.getTopOfTheDayTimestamp(
+ event.getTimestamp()));
+ flowActivityDoc.setFlowName(flowName);
+ flowActivityDoc.setUser(context.getUserId());
+ flowActivityDoc.setId(DocumentStoreUtils.constructFlowActivityDocId(
+ context, event.getTimestamp()));
+ return flowActivityDoc;
+ }
+
+ private void writeFlowRunDoc(FlowRunDocument flowRunDoc) {
+ flowRunCollWriter.writeDocument(flowRunDoc);
+ }
+
+ private void storeFlowActivityDoc(FlowActivityDocument flowActivityDoc) {
+ flowActivityCollWriter.writeDocument(flowActivityDoc);
+ }
+
+ private void writeEntityDoc(TimelineEntityDocument entityDocument) {
+ entityCollWriter.writeDocument(entityDocument);
+ }
+
+ private void writeApplicationDoc(TimelineEntityDocument entityDocument) {
+ appCollWriter.writeDocument(entityDocument);
+ }
+
+ public TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) {
+ return null;
+ }
+
+ @Override
+ public void flush() {
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
new file mode 100755
index 0000000..4b14d47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreUtils.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import com.microsoft.azure.documentdb.ConnectionPolicy;
+import com.microsoft.azure.documentdb.ConsistencyLevel;
+import com.microsoft.azure.documentdb.DocumentClient;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEventSubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * This class consists of all the utils required for reading or writing
+ * documents for a {@link DocumentStoreVendor}.
+ */
+public final class DocumentStoreUtils {
+
+ private DocumentStoreUtils(){}
+
+ /** milliseconds in one day. */
+ private static final long MILLIS_ONE_DAY = 86400000L;
+
+ private static final String TIMELINE_STORE_TYPE =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "document-store-type";
+ static final String TIMELINE_SERVICE_COSMOSDB_ENDPOINT =
+ "yarn.timeline-service.document-store.cosmos-db.endpoint";
+ static final String TIMELINE_SERVICE_COSMOSDB_MASTER_KEY =
+ "yarn.timeline-service.document-store.cosmos-db.masterkey";
+ static final String TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME =
+ "yarn.timeline-service.document-store.db-name";
+ private static final String
+ DEFAULT_TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME = "timeline_service";
+
+ /**
+ * Checks whether the cosmosdb conf are set properly in yarn-site.xml conf.
+ * @param conf
+ * related to yarn
+ * @throws YarnException if required config properties are missing
+ */
+ public static void validateCosmosDBConf(Configuration conf)
+ throws YarnException {
+ if (conf == null) {
+ throw new NullPointerException("Configuration cannot be null");
+ }
+ if (isNullOrEmpty(conf.get(TIMELINE_SERVICE_COSMOSDB_ENDPOINT),
+ conf.get(TIMELINE_SERVICE_COSMOSDB_MASTER_KEY))) {
+ throw new YarnException("One or more CosmosDB configuration property is" +
+ " missing in yarn-site.xml");
+ }
+ }
+
+ /**
+ * Retrieves {@link DocumentStoreVendor} configured.
+ * @param conf
+ * related to yarn
+ * @return Returns the {@link DocumentStoreVendor} that is configured, else
+ * uses {@link DocumentStoreVendor#COSMOS_DB} as default
+ */
+ public static DocumentStoreVendor getStoreVendor(Configuration conf) {
+ return DocumentStoreVendor.getStoreType(conf.get(TIMELINE_STORE_TYPE,
+ DocumentStoreVendor.COSMOS_DB.name()));
+ }
+
+ /**
+ * Retrieves a {@link TimelineEvent} from {@link TimelineEntity#events}.
+ * @param timelineEntity
+ * from which the set of events are examined.
+ * @param eventType
+ * that has to be checked.
+ * @return {@link TimelineEvent} if found else null
+ */
+ public static TimelineEvent fetchEvent(TimelineEntity timelineEntity,
+ String eventType) {
+ for (TimelineEvent event : timelineEntity.getEvents()) {
+ if (event.getId().equals(eventType)) {
+ return event;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Checks if the string is null or empty.
+ * @param values
+ * array of string to be checked
+ * @return false if any of the string is null or empty else true
+ */
+ public static boolean isNullOrEmpty(String...values) {
+ for (String value : values) {
+ if (value == null || value.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Creates CosmosDB Document Client.
+ * @param conf
+ * to retrieve cosmos db endpoint and key
+ * @return async document client for CosmosDB
+ */
+ public static DocumentClient createCosmosDBClient(Configuration conf){
+ return new DocumentClient(DocumentStoreUtils.getCosmosDBEndpoint(conf),
+ DocumentStoreUtils.getCosmosDBMasterKey(conf),
+ ConnectionPolicy.GetDefault(), ConsistencyLevel.Session);
+ }
+
+ /**
+ * Returns the timestamp of the day's start (which is midnight 00:00:00 AM)
+ * for a given input timestamp.
+ *
+ * @param timeStamp Timestamp.
+ * @return timestamp of that day's beginning (midnight)
+ */
+ public static long getTopOfTheDayTimestamp(long timeStamp) {
+ return timeStamp - (timeStamp % MILLIS_ONE_DAY);
+ }
+
+ /**
+ * Creates a composite key for storing {@link TimelineEntityDocument}.
+ * @param collectorContext
+ * of the timeline writer
+ * @param type
+ * of the entity
+ * @return composite key delimited with !
+ */
+ public static String constructTimelineEntityDocId(TimelineCollectorContext
+ collectorContext, String type) {
+ return String.format("%s!%s!%s!%d!%s!%s",
+ collectorContext.getClusterId(), collectorContext.getUserId(),
+ collectorContext.getFlowName(), collectorContext.getFlowRunId(),
+ collectorContext.getAppId(), type);
+ }
+
+ /**
+ * Creates a composite key for storing {@link TimelineEntityDocument}.
+ * @param collectorContext
+ * of the timeline writer
+ * @param type
+ * of the entity
+ * @param id
+ * of the entity
+ * @return composite key delimited with !
+ */
+ public static String constructTimelineEntityDocId(TimelineCollectorContext
+ collectorContext, String type, String id) {
+ return String.format("%s!%s!%s!%d!%s!%s!%s",
+ collectorContext.getClusterId(), collectorContext.getUserId(),
+ collectorContext.getFlowName(), collectorContext.getFlowRunId(),
+ collectorContext.getAppId(), type, id);
+ }
+
+ /**
+ * Creates a composite key for storing {@link FlowRunDocument}.
+ * @param collectorContext
+ * of the timeline writer
+ * @return composite key delimited with !
+ */
+ public static String constructFlowRunDocId(TimelineCollectorContext
+ collectorContext) {
+ return String.format("%s!%s!%s!%s", collectorContext.getClusterId(),
+ collectorContext.getUserId(), collectorContext.getFlowName(),
+ collectorContext.getFlowRunId());
+ }
+
+ /**
+ * Creates a composite key for storing {@link FlowActivityDocument}.
+ * @param collectorContext
+ * of the timeline writer
+ * @param eventTimestamp
+ * of the timeline entity
+ * @return composite key delimited with !
+ */
+ public static String constructFlowActivityDocId(TimelineCollectorContext
+ collectorContext, long eventTimestamp) {
+ return String.format("%s!%s!%s!%s", collectorContext.getClusterId(),
+ getTopOfTheDayTimestamp(eventTimestamp),
+ collectorContext.getUserId(), collectorContext.getFlowName());
+ }
+
+ private static String getCosmosDBEndpoint(Configuration conf) {
+ return conf.get(TIMELINE_SERVICE_COSMOSDB_ENDPOINT);
+ }
+
+ private static String getCosmosDBMasterKey(Configuration conf) {
+ return conf.get(TIMELINE_SERVICE_COSMOSDB_MASTER_KEY);
+ }
+
+ public static String getCosmosDBDatabaseName(Configuration conf) {
+ return conf.get(TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+ getDefaultTimelineServiceDBName(conf));
+ }
+
+ private static String getDefaultTimelineServiceDBName(
+ Configuration conf) {
+ return getClusterId(conf) + "_" +
+ DEFAULT_TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME;
+ }
+
+ private static String getClusterId(Configuration conf) {
+ return conf.get(YarnConfiguration.RM_CLUSTER_ID,
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+ }
+
+ private static boolean isTimeInRange(long time, long timeBegin,
+ long timeEnd) {
+ return (time >= timeBegin) && (time <= timeEnd);
+ }
+
+ /**
+ * Checks if the {@link TimelineEntityFilters} are not matching for a given
+ * {@link TimelineEntity}.
+ * @param filters
+ * that has to be checked for an entity
+ * @param timelineEntity
+ * for which the filters would be applied
+ * @return true if any one of the filter is not matching else false
+ * @throws IOException if an unsupported filter is being matched.
+ */
+ static boolean isFilterNotMatching(TimelineEntityFilters filters,
+ TimelineEntity timelineEntity) throws IOException {
+ if (timelineEntity.getCreatedTime() != null && !isTimeInRange(timelineEntity
+ .getCreatedTime(), filters.getCreatedTimeBegin(),
+ filters.getCreatedTimeEnd())) {
+ return true;
+ }
+
+ if (filters.getRelatesTo() != null &&
+ !filters.getRelatesTo().getFilterList().isEmpty() &&
+ !TimelineStorageUtils.matchRelatesTo(timelineEntity,
+ filters.getRelatesTo())) {
+ return true;
+ }
+
+ if (filters.getIsRelatedTo() != null &&
+ !filters.getIsRelatedTo().getFilterList().isEmpty() &&
+ !TimelineStorageUtils.matchIsRelatedTo(timelineEntity,
+ filters.getIsRelatedTo())) {
+ return true;
+ }
+
+ if (filters.getInfoFilters() != null &&
+ !filters.getInfoFilters().getFilterList().isEmpty() &&
+ !TimelineStorageUtils.matchInfoFilters(timelineEntity,
+ filters.getInfoFilters())) {
+ return true;
+ }
+
+ if (filters.getConfigFilters() != null &&
+ !filters.getConfigFilters().getFilterList().isEmpty() &&
+ !TimelineStorageUtils.matchConfigFilters(timelineEntity,
+ filters.getConfigFilters())) {
+ return true;
+ }
+
+ if (filters.getMetricFilters() != null &&
+ !filters.getMetricFilters().getFilterList().isEmpty() &&
+ !TimelineStorageUtils.matchMetricFilters(timelineEntity,
+ filters.getMetricFilters())) {
+ return true;
+ }
+
+ return filters.getEventFilters() != null &&
+ !filters.getEventFilters().getFilterList().isEmpty() &&
+ !TimelineStorageUtils.matchEventFilters(timelineEntity,
+ filters.getEventFilters());
+ }
+
+ /**
+ * Creates the final entity to be returned as the result.
+ * @param timelineEntityDocument
+ * which has all the information for the entity
+ * @param dataToRetrieve
+ * specifies filters and fields to retrieve
+ * @return {@link TimelineEntity} as the result
+ */
+ public static TimelineEntity createEntityToBeReturned(
+ TimelineEntityDocument timelineEntityDocument,
+ TimelineDataToRetrieve dataToRetrieve) {
+ TimelineEntity entityToBeReturned = createTimelineEntity(
+ timelineEntityDocument.getType(),
+ timelineEntityDocument.fetchTimelineEntity());
+
+ entityToBeReturned.setIdentifier(new TimelineEntity.Identifier(
+ timelineEntityDocument.getType(), timelineEntityDocument.getId()));
+ entityToBeReturned.setCreatedTime(
+ timelineEntityDocument.getCreatedTime());
+ entityToBeReturned.setInfo(timelineEntityDocument.getInfo());
+
+ if (dataToRetrieve.getFieldsToRetrieve() != null) {
+ fillFields(entityToBeReturned, timelineEntityDocument,
+ dataToRetrieve);
+ }
+ return entityToBeReturned;
+ }
+
+ /**
+ * Creates the final entity to be returned as the result.
+ * @param timelineEntityDocument
+ * which has all the information for the entity
+ * @param confsToRetrieve
+ * specifies config filters to be applied
+ * @param metricsToRetrieve
+ * specifies metric filters to be applied
+ *
+ * @return {@link TimelineEntity} as the result
+ */
+ public static TimelineEntity createEntityToBeReturned(
+ TimelineEntityDocument timelineEntityDocument,
+ TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve) {
+ TimelineEntity timelineEntity = timelineEntityDocument
+ .fetchTimelineEntity();
+ if (confsToRetrieve != null) {
+ timelineEntity.setConfigs(DocumentStoreUtils.applyConfigFilter(
+ confsToRetrieve, timelineEntity.getConfigs()));
+ }
+ if (metricsToRetrieve != null) {
+ timelineEntity.setMetrics(DocumentStoreUtils.transformMetrics(
+ metricsToRetrieve, timelineEntityDocument.getMetrics()));
+ }
+ return timelineEntity;
+ }
+
+ private static TimelineEntity createTimelineEntity(String type,
+ TimelineEntity timelineEntity) {
+ switch (TimelineEntityType.valueOf(type)) {
+ case YARN_APPLICATION:
+ return new ApplicationEntity();
+ case YARN_FLOW_RUN:
+ return new FlowRunEntity();
+ case YARN_FLOW_ACTIVITY:
+ FlowActivityEntity flowActivityEntity =
+ (FlowActivityEntity) timelineEntity;
+ FlowActivityEntity newFlowActivity = new FlowActivityEntity();
+ newFlowActivity.addFlowRuns(flowActivityEntity.getFlowRuns());
+ return newFlowActivity;
+ default:
+ return new TimelineEntity();
+ }
+ }
+
+ // fetch required fields for final entity to be returned
+ private static void fillFields(TimelineEntity finalEntity,
+ TimelineEntityDocument entityDoc,
+ TimelineDataToRetrieve dataToRetrieve) {
+ EnumSet<TimelineReader.Field> fieldsToRetrieve =
+ dataToRetrieve.getFieldsToRetrieve();
+ if (fieldsToRetrieve.contains(TimelineReader.Field.ALL)) {
+ fieldsToRetrieve = EnumSet.allOf(TimelineReader.Field.class);
+ }
+ for (TimelineReader.Field field : fieldsToRetrieve) {
+ switch(field) {
+ case CONFIGS:
+ finalEntity.setConfigs(applyConfigFilter(dataToRetrieve
+ .getConfsToRetrieve(), entityDoc.getConfigs()));
+ break;
+ case METRICS:
+ finalEntity.setMetrics(transformMetrics(dataToRetrieve
+ .getMetricsToRetrieve(), entityDoc.getMetrics()));
+ break;
+ case INFO:
+ finalEntity.setInfo(entityDoc.getInfo());
+ break;
+ case IS_RELATED_TO:
+ finalEntity.setIsRelatedToEntities(entityDoc.getIsRelatedToEntities());
+ break;
+ case RELATES_TO:
+ finalEntity.setIsRelatedToEntities(entityDoc.getIsRelatedToEntities());
+ break;
+ case EVENTS:
+ finalEntity.setEvents(transformEvents(entityDoc.getEvents().values()));
+ break;
+ default:
+ }
+ }
+ }
+
+ /* Transforms Collection<Set<TimelineEventSubDoc>> to
+ NavigableSet<TimelineEvent> */
+ private static NavigableSet<TimelineEvent> transformEvents(
+ Collection<Set<TimelineEventSubDoc>> eventSetColl) {
+ NavigableSet<TimelineEvent> timelineEvents = new TreeSet<>();
+ for (Set<TimelineEventSubDoc> eventSubDocs : eventSetColl) {
+ for (TimelineEventSubDoc eventSubDoc : eventSubDocs) {
+ timelineEvents.add(eventSubDoc.fetchTimelineEvent());
+ }
+ }
+ return timelineEvents;
+ }
+
+ public static Set<TimelineMetric> transformMetrics(
+ TimelineFilterList metricsToRetrieve,
+ Map<String, Set<TimelineMetricSubDoc>> metrics) {
+ if (metricsToRetrieve == null ||
+ hasDataToBeRetrieve(metricsToRetrieve, metrics.keySet())) {
+ Set<TimelineMetric> metricSet = new HashSet<>();
+ for(Set<TimelineMetricSubDoc> metricSubDocs : metrics.values()) {
+ for(TimelineMetricSubDoc metricSubDoc : metricSubDocs) {
+ metricSet.add(metricSubDoc.fetchTimelineMetric());
+ }
+ }
+ return metricSet;
+ }
+ return new HashSet<>();
+ }
+
+ public static Map<String, String> applyConfigFilter(
+ TimelineFilterList configsToRetrieve, Map<String, String> configs) {
+ if (configsToRetrieve == null ||
+ hasDataToBeRetrieve(configsToRetrieve, configs.keySet())) {
+ return configs;
+ }
+ return new HashMap<>();
+ }
+
+ private static boolean hasDataToBeRetrieve(
+ TimelineFilterList timelineFilters, Set<String> dataSet) {
+ Set<String> dataToBeRetrieved = new HashSet<>();
+ TimelinePrefixFilter timelinePrefixFilter;
+ for (TimelineFilter timelineFilter : timelineFilters.getFilterList()) {
+ timelinePrefixFilter = (TimelinePrefixFilter) timelineFilter;
+ dataToBeRetrieved.add(timelinePrefixFilter.getPrefix());
+ }
+ switch (timelineFilters.getOperator()) {
+ case OR:
+ if (dataToBeRetrieved.size() == 0 ||
+ !Collections.disjoint(dataSet, dataToBeRetrieved)) {
+ return true;
+ }
+ case AND:
+ if (dataToBeRetrieved.size() == 0 ||
+ dataSet.containsAll(dataToBeRetrieved)) {
+ return true;
+ }
+ default:
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/CollectionType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/CollectionType.java
new file mode 100755
index 0000000..f40e2c2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/CollectionType.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;
+
+/**
+ * Specifies the different collections that are currently used for storing
+ * documents.
+ */
+public enum CollectionType {
+ ENTITY("EntityCollection"),
+ APPLICATION("AppCollection"),
+ FLOW_RUN("FlowRunCollection"),
+ FLOW_ACTIVITY("FlowActivityCollection");
+
+ private final String collectionName;
+
+ CollectionType(String collectionName) {
+ this.collectionName = collectionName;
+ }
+
+ public boolean equals(String otherCollectionName) {
+ return this.collectionName.equals(otherCollectionName);
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/NoDocumentFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/NoDocumentFoundException.java
new file mode 100755
index 0000000..f6c123d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/NoDocumentFoundException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document;
+
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+
+import java.io.IOException;
+
+/**
+ * Indicates that the document that was requested is not found from the
+ * Document Store. This is a generic exception that will be thrown for all
+ * the {@link DocumentStoreVendor} if there is no document while reading.
+ */
+public class NoDocumentFoundException extends IOException {
+
+ /**
+ * Constructs exception with the specified detail message.
+ * @param message detailed message.
+ */
+ public NoDocumentFoundException(String message) {
+ super(message);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/TimelineDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/TimelineDocument.java
new file mode 100755
index 0000000..c0d53f0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/TimelineDocument.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document;
+
+
+/**
+ * This is an interface for all the Timeline Documents. Any new document that
+ * has to be persisted in the document store should implement this.
+ */
+public interface TimelineDocument<Document> {
+
+ String getId();
+
+ String getType();
+
+ long getCreatedTime();
+
+ void setCreatedTime(long time);
+
+ void merge(Document timelineDocument);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEntityDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEntityDocument.java
new file mode 100755
index 0000000..ea72ee3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEntityDocument.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This is a generic class which contains all the meta information of some
+ * conceptual entity and its related events. The timeline entity can be an
+ * application, an attempt, a container or whatever the user-defined object.
+ */
+public class TimelineEntityDocument implements
+ TimelineDocument<TimelineEntityDocument> {
+
+ private final TimelineEntity timelineEntity;
+ private TimelineContext context;
+ private String flowVersion;
+ private String subApplicationUser;
+ private final Map<String, Set<TimelineMetricSubDoc>>
+ metrics = new HashMap<>();
+ private final Map<String, Set<TimelineEventSubDoc>>
+ events = new HashMap<>();
+
+ public TimelineEntityDocument() {
+ timelineEntity = new TimelineEntity();
+ }
+
+ public TimelineEntityDocument(TimelineEntity timelineEntity) {
+ this.timelineEntity = timelineEntity;
+ transformEvents(timelineEntity.getEvents());
+ timelineMetrics(timelineEntity.getMetrics());
+ }
+
+ // transforms TimelineMetric to TimelineMetricSubDoc
+ private void timelineMetrics(Set<TimelineMetric> timelineMetrics) {
+ for (TimelineMetric timelineMetric : timelineMetrics) {
+ if (this.metrics.containsKey(timelineMetric.getId())) {
+ this.metrics.get(timelineMetric.getId()).add(
+ new TimelineMetricSubDoc(timelineMetric));
+ } else {
+ Set<TimelineMetricSubDoc> metricSet = new HashSet<>();
+ metricSet.add(new TimelineMetricSubDoc(timelineMetric));
+ this.metrics.put(timelineMetric.getId(), metricSet);
+ }
+ }
+ }
+
+ // transforms TimelineEvent to TimelineEventSubDoc
+ private void transformEvents(Set<TimelineEvent> timelineEvents) {
+ for (TimelineEvent timelineEvent : timelineEvents) {
+ if (this.events.containsKey(timelineEvent.getId())) {
+ this.events.get(timelineEvent.getId())
+ .add(new TimelineEventSubDoc(timelineEvent));
+ } else {
+ Set<TimelineEventSubDoc> eventSet = new HashSet<>();
+ eventSet.add(new TimelineEventSubDoc(timelineEvent));
+ this.events.put(timelineEvent.getId(), eventSet);
+ }
+ }
+ }
+
+ /**
+ * Merge the TimelineEntityDocument that is passed with the current
+ * document for upsert.
+ *
+ * @param newTimelineDocument
+ * that has to be merged
+ */
+ @Override
+ public void merge(TimelineEntityDocument newTimelineDocument) {
+ if(newTimelineDocument.getCreatedTime() > 0) {
+ timelineEntity.setCreatedTime(newTimelineDocument.getCreatedTime());
+ }
+ setMetrics(newTimelineDocument.getMetrics());
+ setEvents(newTimelineDocument.getEvents());
+ timelineEntity.getInfo().putAll(newTimelineDocument.getInfo());
+ timelineEntity.getConfigs().putAll(newTimelineDocument.getConfigs());
+ timelineEntity.getIsRelatedToEntities().putAll(newTimelineDocument
+ .getIsRelatedToEntities());
+ timelineEntity.getRelatesToEntities().putAll(newTimelineDocument
+ .getRelatesToEntities());
+ }
+
+ @Override
+ public String getId() {
+ return timelineEntity.getId();
+ }
+
+ public void setId(String key) {
+ timelineEntity.setId(key);
+ }
+
+ public String getType() {
+ return timelineEntity.getType();
+ }
+
+ public void setType(String type) {
+ timelineEntity.setType(type);
+ }
+
+ public Map<String, Object> getInfo() {
+ timelineEntity.getInfo().put(TimelineReaderUtils.FROMID_KEY, getId());
+ return timelineEntity.getInfo();
+ }
+
+ public void setInfo(Map<String, Object> info) {
+ timelineEntity.setInfo(info);
+ }
+
+ public Map<String, Set<TimelineMetricSubDoc>> getMetrics() {
+ return metrics;
+ }
+
+ public void setMetrics(Map<String, Set<TimelineMetricSubDoc>> metrics) {
+ for (String metricId : metrics.keySet()) {
+ for(TimelineMetricSubDoc metricSubDoc : metrics.get(metricId)) {
+ timelineEntity.addMetric(metricSubDoc.fetchTimelineMetric());
+ }
+ if (this.metrics.containsKey(metricId)) {
+ this.metrics.get(metricId).addAll(metrics.get(metricId));
+ } else {
+ this.metrics.put(metricId, new HashSet<>(metrics.get(metricId)));
+ }
+ }
+ }
+
+ public Map<String, Set<TimelineEventSubDoc>> getEvents() {
+ return events;
+ }
+
+ public void setEvents(Map<String, Set<TimelineEventSubDoc>> events) {
+ for (String eventId : events.keySet()) {
+ for(TimelineEventSubDoc eventSubDoc: events.get(eventId)) {
+ timelineEntity.addEvent(eventSubDoc.fetchTimelineEvent());
+ }
+ if (this.events.containsKey(eventId)) {
+ this.events.get(eventId).addAll(events.get(eventId));
+ } else {
+ this.events.put(eventId, new HashSet<>(events.get(eventId)));
+ }
+ }
+ }
+
+ public Map<String, String> getConfigs() {
+ return timelineEntity.getConfigs();
+ }
+
+ public void setConfigs(Map<String, String> configs) {
+ timelineEntity.setConfigs(configs);
+ }
+
+ public Map<String, Set<String>> getIsRelatedToEntities() {
+ return timelineEntity.getIsRelatedToEntities();
+ }
+
+ public void setIsRelatedToEntities(Map<String, Set<String>>
+ isRelatedToEntities) {
+ timelineEntity.setIsRelatedToEntities(isRelatedToEntities);
+ }
+
+ public Map<String, Set<String>> getRelatesToEntities() {
+ return timelineEntity.getRelatesToEntities();
+ }
+
+ public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
+ timelineEntity.setRelatesToEntities(relatesToEntities);
+ }
+
+ public String getFlowVersion() {
+ return flowVersion;
+ }
+
+
+ public void setFlowVersion(String flowVersion) {
+ this.flowVersion = flowVersion;
+ }
+
+ public void setIdentifier(TimelineEntity.Identifier identifier) {
+ timelineEntity.setIdentifier(identifier);
+ }
+
+ public void setIdPrefix(long idPrefix) {
+ timelineEntity.setIdPrefix(idPrefix);
+ }
+
+ public String getSubApplicationUser() {
+ return subApplicationUser;
+ }
+
+ public void setSubApplicationUser(String subApplicationUser) {
+ this.subApplicationUser = subApplicationUser;
+ }
+
+ public long getCreatedTime() {
+ if (timelineEntity.getCreatedTime() == null) {
+ return 0;
+ }
+ return timelineEntity.getCreatedTime();
+ }
+
+ public void setCreatedTime(long createdTime) {
+ timelineEntity.setCreatedTime(createdTime);
+ }
+
+ public TimelineContext getContext() {
+ return context;
+ }
+
+ public void setContext(TimelineContext context) {
+ this.context = context;
+ }
+
+ public TimelineEntity fetchTimelineEntity() {
+ return timelineEntity;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
new file mode 100755
index 0000000..cce64ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineEventSubDoc.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+
+import java.util.Map;
+
+/**
+ * This class represents a Sub Document for {@link TimelineEvent}
+ * when creating a new {@link TimelineEntityDocument}.
+ */
+public class TimelineEventSubDoc {
+
+ private final TimelineEvent timelineEvent;
+ private boolean valid;
+
+ public TimelineEventSubDoc() {
+ timelineEvent = new TimelineEvent();
+ }
+
+ public TimelineEventSubDoc(TimelineEvent timelineEvent) {
+ this.timelineEvent = timelineEvent;
+ }
+
+ public String getId() {
+ return timelineEvent.getId();
+ }
+
+ public void setId(String eventId) {
+ timelineEvent.setId(eventId);
+ }
+
+ public boolean isValid() {
+ return timelineEvent.isValid();
+ }
+
+ public void setValid(boolean valid) {
+ this.valid = valid;
+ }
+
+ public long getTimestamp() {
+ return timelineEvent.getTimestamp();
+ }
+
+ public void setTimestamp(long ts) {
+ timelineEvent.setTimestamp(ts);
+ }
+
+ public Map<String, Object> getInfo() {
+ return timelineEvent.getInfo();
+ }
+
+ public void setInfo(Map<String, Object> info) {
+ timelineEvent.setInfo(TimelineServiceHelper.mapCastToHashMap(info));
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * timelineEvent.getId().hashCode();
+ }
+
+ // Only check if id is equal
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof TimelineEventSubDoc)) {
+ return false;
+ }
+ TimelineEventSubDoc otherTimelineEvent = (TimelineEventSubDoc) obj;
+ return this.timelineEvent.getId().equals(otherTimelineEvent.getId());
+ }
+
+ public TimelineEvent fetchTimelineEvent() {
+ return timelineEvent;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
new file mode 100755
index 0000000..f7d078f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/TimelineMetricSubDoc.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class represents a Sub Document for {@link TimelineMetric} that will be
+ * used when creating new {@link TimelineEntityDocument}.
+ */
+public class TimelineMetricSubDoc {
+
+ private final TimelineMetric timelineMetric;
+ private boolean valid;
+ private long singleDataTimestamp;
+ private Number singleDataValue = 0;
+
+ public TimelineMetricSubDoc() {
+ this.timelineMetric = new TimelineMetric();
+ }
+
+ public TimelineMetricSubDoc(TimelineMetric timelineMetric) {
+ this.timelineMetric = timelineMetric;
+ this.valid = timelineMetric.isValid();
+ if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE &&
+ timelineMetric.getValues().size() > 0) {
+ this.singleDataTimestamp = timelineMetric.getSingleDataTimestamp();
+ this.singleDataValue = timelineMetric.getSingleDataValue();
+ }
+ }
+
+ /**
+ * Get the real time aggregation operation of this metric.
+ *
+ * @return Real time aggregation operation
+ */
+ public TimelineMetricOperation getRealtimeAggregationOp() {
+ return timelineMetric.getRealtimeAggregationOp();
+ }
+
+ /**
+ * Set the real time aggregation operation of this metric.
+ *
+ * @param op A timeline metric operation that the metric should perform on
+ * real time aggregations
+ */
+ public void setRealtimeAggregationOp(
+ final TimelineMetricOperation op) {
+ timelineMetric.setRealtimeAggregationOp(op);
+ }
+
+ public String getId() {
+ return timelineMetric.getId();
+ }
+
+ public void setId(String metricId) {
+ timelineMetric.setId(metricId);
+ }
+
+ public void setSingleDataTimestamp(long singleDataTimestamp) {
+ this.singleDataTimestamp = singleDataTimestamp;
+ }
+
+ /**
+ * Get single data timestamp of the metric.
+ *
+ * @return the single data timestamp
+ */
+ public long getSingleDataTimestamp() {
+ if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE) {
+ return singleDataTimestamp;
+ }
+ return 0;
+ }
+
+ /**
+ * Get single data value of the metric.
+ *
+ * @return the single data value
+ */
+ public Number getSingleDataValue() {
+ if (timelineMetric.getType() == TimelineMetric.Type.SINGLE_VALUE) {
+ return singleDataValue;
+ }
+ return null;
+ }
+
+ public void setSingleDataValue(Number singleDataValue) {
+ this.singleDataValue = singleDataValue;
+ }
+
+ public Map<Long, Number> getValues() {
+ return timelineMetric.getValues();
+ }
+
+ public void setValues(Map<Long, Number> vals) {
+ timelineMetric.setValues(vals);
+ }
+
+ // required by JAXB
+ public TreeMap<Long, Number> getValuesJAXB() {
+ return timelineMetric.getValuesJAXB();
+ }
+
+ public TimelineMetric.Type getType() {
+ return timelineMetric.getType();
+ }
+
+ public void setType(TimelineMetric.Type metricType) {
+ timelineMetric.setType(metricType);
+ }
+
+ public void setValid(boolean valid) {
+ this.valid = valid;
+ }
+
+ public boolean isValid() {
+ return (timelineMetric.getId() != null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = timelineMetric.getId().hashCode();
+ result = 31 * result + timelineMetric.getType().hashCode();
+ return result;
+ }
+
+ // Only check if timestamp and id are equal
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof TimelineMetricSubDoc)) {
+ return false;
+ }
+ TimelineMetricSubDoc otherTimelineMetric = (TimelineMetricSubDoc) obj;
+ if (!this.timelineMetric.getId().equals(otherTimelineMetric.getId())) {
+ return false;
+ }
+ return this.timelineMetric.getType() == otherTimelineMetric.getType();
+ }
+
+ public TimelineMetric fetchTimelineMetric() {
+ return timelineMetric;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/package-info.java
new file mode 100644
index 0000000..dbafa9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/entity/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document.entity contains
+ * TimelineEntityDocument that will be common to different TimelineEntity i.e
+ * Application, App Attempt, Container etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivityDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivityDocument.java
new file mode 100755
index 0000000..264bfec
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivityDocument.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity;
+
+
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This doc represents the {@link FlowActivityEntity} which is used for
+ * showing all the flow runs with limited information.
+ */
+public class FlowActivityDocument implements
+ TimelineDocument<FlowActivityDocument> {
+
+ private String id;
+ private final String type = TimelineEntityType.YARN_FLOW_ACTIVITY.toString();
+ private Set<FlowActivitySubDoc> flowActivities = new HashSet<>();
+ private long dayTimestamp;
+ private String user;
+ private String flowName;
+
+ public FlowActivityDocument() {
+ }
+
+ public FlowActivityDocument(String flowName, String flowVersion,
+ long flowRunId) {
+ flowActivities.add(new FlowActivitySubDoc(flowName,
+ flowVersion, flowRunId));
+ }
+
+ /**
+ * Merge the {@link FlowActivityDocument} that is passed with the current
+ * document for upsert.
+ *
+ * @param flowActivityDocument
+ * that has to be merged
+ */
+ @Override
+ public void merge(FlowActivityDocument flowActivityDocument) {
+ if (flowActivityDocument.getDayTimestamp() > 0) {
+ this.dayTimestamp = flowActivityDocument.getDayTimestamp();
+ }
+ this.flowName = flowActivityDocument.getFlowName();
+ this.user = flowActivityDocument.getUser();
+ this.id = flowActivityDocument.getId();
+ this.flowActivities.addAll(flowActivityDocument.getFlowActivities());
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ public void addFlowActivity(String flowActivityName, String flowVersion,
+ long flowRunId) {
+ flowActivities.add(new FlowActivitySubDoc(flowActivityName,
+ flowVersion, flowRunId));
+ }
+
+ public Set<FlowActivitySubDoc> getFlowActivities() {
+ return flowActivities;
+ }
+
+ public void setFlowActivities(Set<FlowActivitySubDoc> flowActivities) {
+ this.flowActivities = flowActivities;
+ }
+
+ @Override
+ public long getCreatedTime() {
+ return TimeUnit.SECONDS.toMillis(dayTimestamp);
+ }
+
+ @Override
+ public void setCreatedTime(long time) {
+ }
+
+ public long getDayTimestamp() {
+ return dayTimestamp;
+ }
+
+ public void setDayTimestamp(long dayTimestamp) {
+ this.dayTimestamp = dayTimestamp;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowName(String flowName) {
+ this.flowName = flowName;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivitySubDoc.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivitySubDoc.java
new file mode 100755
index 0000000..93b28e0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/FlowActivitySubDoc.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity;
+
+/**
+ * This is a sub doc which represents each flow.
+ */
+public class FlowActivitySubDoc {
+ private String flowName;
+ private String flowVersion;
+ private long flowRunId;
+
+ public FlowActivitySubDoc() {
+ }
+
+ public FlowActivitySubDoc(String flowName, String flowVersion,
+ long flowRunId) {
+ this.flowName = flowName;
+ this.flowVersion = flowVersion;
+ this.flowRunId = flowRunId;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public String getFlowVersion() {
+ return flowVersion;
+ }
+
+ public long getFlowRunId() {
+ return flowRunId;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = flowVersion.hashCode();
+ result = (int) (31 * result + flowRunId);
+ return result;
+ }
+
+ // Only check if type and id are equal
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof FlowActivitySubDoc)) {
+ return false;
+ }
+ FlowActivitySubDoc m = (FlowActivitySubDoc) o;
+ if (!flowVersion.equalsIgnoreCase(m.getFlowVersion())) {
+ return false;
+ }
+ return flowRunId == m.getFlowRunId();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/package-info.java
new file mode 100644
index 0000000..c1127dc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowactivity/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document.flowactivity contains
+ * FlowActivityDocument to audit all the flows at a day level.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/FlowRunDocument.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/FlowRunDocument.java
new file mode 100755
index 0000000..8ee87ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/FlowRunDocument.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This doc represents the flow run information for every job.
+ */
+public class FlowRunDocument implements TimelineDocument<FlowRunDocument> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(FlowRunDocument.class);
+
+ private String id;
+ private final String type = TimelineEntityType.YARN_FLOW_RUN.toString();
+ private String clusterId;
+ private String username;
+ private String flowName;
+ private Long flowRunId;
+ private String flowVersion;
+ private long minStartTime;
+ private long maxEndTime;
+ private final Map<String, TimelineMetricSubDoc>
+ metrics = new HashMap<>();
+
+ public FlowRunDocument() {
+ }
+
+ public FlowRunDocument(TimelineCollectorContext collectorContext,
+ Set<TimelineMetric> metrics) {
+ this.clusterId = collectorContext.getClusterId();
+ this.username = collectorContext.getUserId();
+ this.flowName = collectorContext.getFlowName();
+ this.flowRunId = collectorContext.getFlowRunId();
+ transformMetrics(metrics);
+ }
+
+ private void transformMetrics(Set<TimelineMetric> timelineMetrics) {
+ for (TimelineMetric metric : timelineMetrics) {
+ TimelineMetricSubDoc metricSubDoc = new TimelineMetricSubDoc(metric);
+ this.metrics.put(metric.getId(), metricSubDoc);
+ }
+ }
+
+ /**
+ * Merge the {@link FlowRunDocument} that is passed with the current
+ * document for upsert.
+ *
+ * @param flowRunDoc
+ * that has to be merged
+ */
+ @Override
+ public void merge(FlowRunDocument flowRunDoc) {
+ if (flowRunDoc.getMinStartTime() > 0) {
+ this.minStartTime = flowRunDoc.getMinStartTime();
+ }
+ if (flowRunDoc.getMaxEndTime() > 0) {
+ this.maxEndTime = flowRunDoc.getMaxEndTime();
+ }
+ this.clusterId = flowRunDoc.getClusterId();
+ this.flowName = flowRunDoc.getFlowName();
+ this.id = flowRunDoc.getId();
+ this.username = flowRunDoc.getUsername();
+ this.flowVersion = flowRunDoc.getFlowVersion();
+ this.flowRunId = flowRunDoc.getFlowRunId();
+ aggregateMetrics(flowRunDoc.getMetrics());
+ }
+
+ private void aggregateMetrics(
+ Map<String, TimelineMetricSubDoc> metricSubDocMap) {
+ for(String metricId : metricSubDocMap.keySet()) {
+ if (this.metrics.containsKey(metricId)) {
+ TimelineMetric incomingMetric =
+ metricSubDocMap.get(metricId).fetchTimelineMetric();
+ TimelineMetric baseMetric =
+ this.metrics.get(metricId).fetchTimelineMetric();
+ if (incomingMetric.getValues().size() > 0) {
+ baseMetric = aggregate(incomingMetric, baseMetric);
+ this.metrics.put(metricId, new TimelineMetricSubDoc(baseMetric));
+ } else {
+ LOG.debug("No incoming metric to aggregate for : {}",
+ baseMetric.getId());
+ }
+ } else {
+ this.metrics.put(metricId, metricSubDocMap.get(metricId));
+ }
+ }
+ }
+
+ private TimelineMetric aggregate(TimelineMetric incomingMetric,
+ TimelineMetric baseMetric) {
+ switch (baseMetric.getRealtimeAggregationOp()) {
+ case SUM:
+ baseMetric = TimelineMetricOperation.SUM
+ .aggregate(incomingMetric, baseMetric, null);
+ break;
+ case AVG:
+ baseMetric = TimelineMetricOperation.AVG
+ .aggregate(incomingMetric, baseMetric, null);
+ break;
+ case MAX:
+ baseMetric = TimelineMetricOperation.MAX
+ .aggregate(incomingMetric, baseMetric, null);
+ break;
+ case REPLACE:
+ baseMetric = TimelineMetricOperation.REPLACE
+ .aggregate(incomingMetric, baseMetric, null);
+ default:
+ //NoOP
+ }
+ return baseMetric;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getFlowName() {
+ return flowName;
+ }
+
+ public void setFlowName(String flowName) {
+ this.flowName = flowName;
+ }
+
+ public Long getFlowRunId() {
+ return flowRunId;
+ }
+
+ public void setFlowRunId(Long flowRunId) {
+ this.flowRunId = flowRunId;
+ }
+
+ public Map<String, TimelineMetricSubDoc> getMetrics() {
+ return metrics;
+ }
+
+ public void setMetrics(Map<String, TimelineMetricSubDoc> metrics) {
+ this.metrics.putAll(metrics);
+ }
+
+ public Set<TimelineMetric> fetchTimelineMetrics() {
+ Set<TimelineMetric> metricSet = new HashSet<>();
+ for(TimelineMetricSubDoc metricSubDoc : metrics.values()) {
+ metricSet.add(metricSubDoc.fetchTimelineMetric());
+ }
+ return metricSet;
+ }
+
+ public long getMinStartTime() {
+ return minStartTime;
+ }
+
+ public void setMinStartTime(long minStartTime) {
+ this.minStartTime = minStartTime;
+ }
+
+ public long getMaxEndTime() {
+ return maxEndTime;
+ }
+
+ public void setMaxEndTime(long maxEndTime) {
+ this.maxEndTime = maxEndTime;
+ }
+
+ @Override
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public long getCreatedTime() {
+ return minStartTime;
+ }
+
+ @Override
+ public void setCreatedTime(long createdTime) {
+ if(minStartTime == 0) {
+ minStartTime = createdTime;
+ }
+ }
+
+ public String getFlowVersion() {
+ return flowVersion;
+ }
+
+ public void setFlowVersion(String flowVersion) {
+ this.flowVersion = flowVersion;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/package-info.java
new file mode 100644
index 0000000..43c2ae0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/flowrun/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document.flowrun contains
+ * FlowRunDocument that stores the flow level information for
+ * each Application and also aggregates the metrics.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/package-info.java
new file mode 100644
index 0000000..ff3cf82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/document/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection.document contains interface for all the
+ * Timeline Documents. Any new document that has to be persisted in
+ * the document store should implement this.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/package-info.java
new file mode 100644
index 0000000..663e8de
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.collection contains different collection types
+ * for storing documents.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreFactory.java
new file mode 100755
index 0000000..04c9bb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb.CosmosDBDocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb.CosmosDBDocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+
+import static org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils.getStoreVendor;
+
+/**
+ * Factory methods for instantiating a timeline Document Store reader or
+ * writer. Based on the {@link DocumentStoreVendor} that is configured,
+ * appropriate reader or writer would be instantiated.
+ */
+public final class DocumentStoreFactory {
+
+ // making factory class not instantiable
+ private DocumentStoreFactory(){
+ }
+
+ /**
+ * Creates a DocumentStoreWriter for a {@link DocumentStoreVendor}.
+ * @param conf
+ * for creating client connection
+ * @param <Document> type of Document for which the writer has to be created,
+ * i.e TimelineEntityDocument, FlowActivityDocument etc
+ * @return document store writer
+ * @throws DocumentStoreNotSupportedException if there is no implementation
+ * for a configured {@link DocumentStoreVendor} or unknown
+ * {@link DocumentStoreVendor} is configured.
+ * @throws YarnException if the required configs for DocumentStore is missing.
+ */
+ public static <Document extends TimelineDocument>
+ DocumentStoreWriter <Document> createDocumentStoreWriter(
+ Configuration conf) throws YarnException {
+ final DocumentStoreVendor storeType = getStoreVendor(conf);
+ switch (storeType) {
+ case COSMOS_DB:
+ DocumentStoreUtils.validateCosmosDBConf(conf);
+ return new CosmosDBDocumentStoreWriter<>(conf);
+ default:
+ throw new DocumentStoreNotSupportedException(
+ "Unable to create DocumentStoreWriter for type : "
+ + storeType);
+ }
+ }
+
+ /**
+ * Creates a DocumentStoreReader for a {@link DocumentStoreVendor}.
+ * @param conf
+ * for creating client connection
+ * @param <Document> type of Document for which the writer has to be created,
+ * i.e TimelineEntityDocument, FlowActivityDocument etc
+ * @return document store reader
+ * @throws DocumentStoreNotSupportedException if there is no implementation
+ * for a configured {@link DocumentStoreVendor} or unknown
+ * {@link DocumentStoreVendor} is configured.
+ * @throws YarnException if the required configs for DocumentStore is missing.
+ * */
+ public static <Document extends TimelineDocument>
+ DocumentStoreReader<Document> createDocumentStoreReader(
+ Configuration conf) throws YarnException {
+ final DocumentStoreVendor storeType = getStoreVendor(conf);
+ switch (storeType) {
+ case COSMOS_DB:
+ DocumentStoreUtils.validateCosmosDBConf(conf);
+ return new CosmosDBDocumentStoreReader<>(conf);
+ default:
+ throw new DocumentStoreNotSupportedException(
+ "Unable to create DocumentStoreReader for type : "
+ + storeType);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreNotSupportedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreNotSupportedException.java
new file mode 100755
index 0000000..7fc9c6a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreNotSupportedException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+/**
+ * Indicates that the document store vendor that was
+ * configured does not belong to one of the {@link DocumentStoreVendor}.
+ */
+public class DocumentStoreNotSupportedException extends
+ UnsupportedOperationException {
+
+ /**
+ * Constructs exception with the specified detail message.
+ * @param message detailed message.
+ */
+ public DocumentStoreNotSupportedException(String message) {
+ super(message);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreVendor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreVendor.java
new file mode 100755
index 0000000..e7612f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/DocumentStoreVendor.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+/**
+ * Represents the different vendors for DocumentStore.
+ */
+public enum DocumentStoreVendor {
+
+ COSMOS_DB,
+ MONGO_DB,
+ ELASTIC_SEARCH;
+
+ public static DocumentStoreVendor getStoreType(String storeTypeStr) {
+ for (DocumentStoreVendor storeType : DocumentStoreVendor.values()) {
+ if (storeType.name().equalsIgnoreCase(storeTypeStr)) {
+ return DocumentStoreVendor.valueOf(storeTypeStr.toUpperCase());
+ }
+ }
+ throw new DocumentStoreNotSupportedException(
+ storeTypeStr + " is not a valid DocumentStoreVendor");
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/package-info.java
new file mode 100644
index 0000000..83a8479
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/lib/package-info.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.
+ * documentstore.lib contains factory class for instantiating
+ * different DocumentStore reader writer client based on the DocumentVendor
+ * configured.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/package-info.java
new file mode 100644
index 0000000..2803777
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore
+ * contains DocumentStore Reader and Writer Implementation of TimelineService
+ * for reading and writing documents from DocumentStore.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DocumentStoreReader.java
new file mode 100755
index 0000000..23f96cc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DocumentStoreReader.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.NoDocumentFoundException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Every {@link DocumentStoreVendor} have to implement this for creating
+ * reader to its backend.
+ */
+public interface DocumentStoreReader<Document extends TimelineDocument>
+ extends AutoCloseable {
+
+ Document readDocument(String collectionName, TimelineReaderContext context,
+ Class<Document> documentClass) throws NoDocumentFoundException;
+
+ List<Document> readDocumentList(String collectionName,
+ TimelineReaderContext context, Class<Document> documentClass,
+ long documentsSize) throws NoDocumentFoundException;
+
+ Set<String> fetchEntityTypes(String collectionName,
+ TimelineReaderContext context);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/TimelineCollectionReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/TimelineCollectionReader.java
new file mode 100755
index 0000000..a8c2c8e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/TimelineCollectionReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivitySubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This is a generic Collection reader for reading documents belonging to a
+ * {@link CollectionType} under a specific {@link DocumentStoreVendor} backend.
+ */
+public class TimelineCollectionReader {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TimelineCollectionReader.class);
+
+ private final DocumentStoreReader<TimelineEntityDocument>
+ genericEntityDocReader;
+ private final DocumentStoreReader<FlowRunDocument>
+ flowRunDocReader;
+ private final DocumentStoreReader<FlowActivityDocument>
+ flowActivityDocReader;
+
+ public TimelineCollectionReader(
+ Configuration conf) throws YarnException {
+ LOG.info("Initializing TimelineCollectionReader...");
+ genericEntityDocReader = DocumentStoreFactory
+ .createDocumentStoreReader(conf);
+ flowRunDocReader = DocumentStoreFactory
+ .createDocumentStoreReader(conf);
+ flowActivityDocReader = DocumentStoreFactory
+ .createDocumentStoreReader(conf);
+ }
+
+ /**
+ * Read a document from {@link DocumentStoreVendor} backend for
+ * a {@link CollectionType}.
+ * @param context
+ * of the timeline reader
+ * @return TimelineEntityDocument as response
+ * @throws IOException on error while reading
+ */
+ public TimelineEntityDocument readDocument(
+ TimelineReaderContext context) throws IOException {
+ LOG.debug("Fetching document for entity type {}", context.getEntityType());
+ switch (TimelineEntityType.valueOf(context.getEntityType())) {
+ case YARN_APPLICATION:
+ return genericEntityDocReader.readDocument(
+ CollectionType.APPLICATION.getCollectionName(), context,
+ TimelineEntityDocument.class);
+ case YARN_FLOW_RUN:
+ FlowRunDocument flowRunDoc = flowRunDocReader.readDocument(
+ CollectionType.FLOW_RUN.getCollectionName(), context,
+ FlowRunDocument.class);
+ FlowRunEntity flowRun = createFlowRunEntity(flowRunDoc);
+ return new TimelineEntityDocument(flowRun);
+ case YARN_FLOW_ACTIVITY:
+ FlowActivityDocument flowActivityDoc = flowActivityDocReader
+ .readDocument(CollectionType.FLOW_RUN.getCollectionName(),
+ context, FlowActivityDocument.class);
+ FlowActivityEntity flowActivity = createFlowActivityEntity(context,
+ flowActivityDoc);
+ return new TimelineEntityDocument(flowActivity);
+ default:
+ return genericEntityDocReader.readDocument(
+ CollectionType.ENTITY.getCollectionName(), context,
+ TimelineEntityDocument.class);
+ }
+ }
+
+ /**
+ * Read a list of documents from {@link DocumentStoreVendor} backend for
+ * a {@link CollectionType}.
+ * @param context
+ * of the timeline reader
+ * @param documentsSize
+ * to limit
+ * @return List of TimelineEntityDocument as response
+ * @throws IOException on error while reading
+ */
+ public List<TimelineEntityDocument> readDocuments(
+ TimelineReaderContext context, long documentsSize) throws IOException {
+ List<TimelineEntityDocument> entityDocs = new ArrayList<>();
+ LOG.debug("Fetching documents for entity type {}", context.getEntityType());
+ switch (TimelineEntityType.valueOf(context.getEntityType())) {
+ case YARN_APPLICATION:
+ return genericEntityDocReader.readDocumentList(
+ CollectionType.APPLICATION.getCollectionName(), context,
+ TimelineEntityDocument.class, documentsSize);
+ case YARN_FLOW_RUN:
+ List<FlowRunDocument> flowRunDocs = flowRunDocReader.readDocumentList(
+ CollectionType.FLOW_RUN.getCollectionName(), context,
+ FlowRunDocument.class, documentsSize);
+ for (FlowRunDocument flowRunDoc : flowRunDocs) {
+ entityDocs.add(new TimelineEntityDocument(createFlowRunEntity(
+ flowRunDoc)));
+ }
+ return entityDocs;
+ case YARN_FLOW_ACTIVITY:
+ List<FlowActivityDocument> flowActivityDocs = flowActivityDocReader
+ .readDocumentList(CollectionType.FLOW_ACTIVITY.getCollectionName(),
+ context, FlowActivityDocument.class, documentsSize);
+ for(FlowActivityDocument flowActivityDoc : flowActivityDocs) {
+ entityDocs.add(new TimelineEntityDocument(
+ createFlowActivityEntity(context, flowActivityDoc)));
+ }
+ return entityDocs;
+ default:
+ return genericEntityDocReader.readDocumentList(
+ CollectionType.ENTITY.getCollectionName(), context,
+ TimelineEntityDocument.class, documentsSize);
+ }
+ }
+
+ /**
+ * Fetches the list of Entity Types i.e (YARN_CONTAINER,
+ * YARN_APPLICATION_ATTEMPT etc.) for an application Id.
+ * @param context
+ * of the timeline reader
+ * @return List of EntityTypes as response
+ */
+ public Set<String> fetchEntityTypes(
+ TimelineReaderContext context) {
+ LOG.debug("Fetching all entity-types for appId : {}", context.getAppId());
+ return genericEntityDocReader.fetchEntityTypes(
+ CollectionType.ENTITY.getCollectionName(), context);
+ }
+
+ private FlowActivityEntity createFlowActivityEntity(
+ TimelineReaderContext context, FlowActivityDocument flowActivityDoc) {
+ FlowActivityEntity flowActivity = new FlowActivityEntity(
+ context.getClusterId(), flowActivityDoc.getDayTimestamp(),
+ flowActivityDoc.getUser(), flowActivityDoc.getFlowName());
+ flowActivity.setId(flowActivityDoc.getId());
+ // get the list of run ids along with the version that are associated with
+ // this flow on this day
+ for (FlowActivitySubDoc activity : flowActivityDoc
+ .getFlowActivities()) {
+ FlowRunEntity flowRunEntity = new FlowRunEntity();
+ flowRunEntity.setUser(flowActivityDoc.getUser());
+ flowRunEntity.setName(activity.getFlowName());
+ flowRunEntity.setRunId(activity.getFlowRunId());
+ flowRunEntity.setVersion(activity.getFlowVersion());
+ flowRunEntity.setId(flowRunEntity.getId());
+ flowActivity.addFlowRun(flowRunEntity);
+ }
+ flowActivity.getInfo().put(TimelineReaderUtils.FROMID_KEY,
+ flowActivityDoc.getId());
+ flowActivity.setCreatedTime(flowActivityDoc.getDayTimestamp());
+ return flowActivity;
+ }
+
+ private FlowRunEntity createFlowRunEntity(FlowRunDocument flowRunDoc) {
+ FlowRunEntity flowRun = new FlowRunEntity();
+ flowRun.setRunId(flowRunDoc.getFlowRunId());
+ flowRun.setUser(flowRunDoc.getUsername());
+ flowRun.setName(flowRunDoc.getFlowName());
+
+ // read the start time
+ if (flowRunDoc.getMinStartTime() > 0) {
+ flowRun.setStartTime(flowRunDoc.getMinStartTime());
+ }
+
+ // read the end time if available
+ if (flowRunDoc.getMaxEndTime() > 0) {
+ flowRun.setMaxEndTime(flowRunDoc.getMaxEndTime());
+ }
+
+ // read the flow version
+ if (!DocumentStoreUtils.isNullOrEmpty(flowRunDoc.getFlowVersion())) {
+ flowRun.setVersion(flowRunDoc.getFlowVersion());
+ }
+ flowRun.setMetrics(flowRunDoc.fetchTimelineMetrics());
+ flowRun.setId(flowRunDoc.getId());
+ flowRun.getInfo().put(TimelineReaderUtils.FROMID_KEY, flowRunDoc.getId());
+ return flowRun;
+ }
+
+ public void close() throws Exception {
+ genericEntityDocReader.close();
+ flowRunDocReader.close();
+ flowActivityDocReader.close();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
new file mode 100755
index 0000000..64468ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/CosmosDBDocumentStoreReader.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
+
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.FeedOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.NoDocumentFoundException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * This is the Document Store Reader implementation for
+ * {@link DocumentStoreVendor#COSMOS_DB}.
+ */
+public class CosmosDBDocumentStoreReader<TimelineDoc extends TimelineDocument>
+ implements DocumentStoreReader<TimelineDoc> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CosmosDBDocumentStoreReader.class);
+ private static final int DEFAULT_DOCUMENTS_SIZE = 1;
+
+ private static DocumentClient client;
+ private final String databaseName;
+ private final static String COLLECTION_LINK = "/dbs/%s/colls/%s";
+ private final static String SELECT_TOP_FROM_COLLECTION = "SELECT TOP %d * " +
+ "FROM %s c";
+ private final static String SELECT_ALL_FROM_COLLECTION =
+ "SELECT * FROM %s c";
+ private final static String SELECT_DISTINCT_TYPES_FROM_COLLECTION =
+ "SELECT distinct c.type FROM %s c";
+ private static final String ENTITY_TYPE_COLUMN = "type";
+ private final static String WHERE_CLAUSE = " WHERE ";
+ private final static String AND_OPERATOR = " AND ";
+ private final static String CONTAINS_FUNC_FOR_ID = " CONTAINS(c.id, \"%s\") ";
+ private final static String CONTAINS_FUNC_FOR_TYPE = " CONTAINS(c.type, " +
+ "\"%s\") ";
+ private final static String ORDER_BY_CLAUSE = " ORDER BY c.createdTime";
+
+ public CosmosDBDocumentStoreReader(Configuration conf) {
+ LOG.info("Initializing Cosmos DB DocumentStoreReader...");
+ databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
+ // making CosmosDB Client Singleton
+ if (client == null) {
+ synchronized (this) {
+ if (client == null) {
+ LOG.info("Creating Cosmos DB Client...");
+ client = DocumentStoreUtils.createCosmosDBClient(conf);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<TimelineDoc> readDocumentList(String collectionName,
+ TimelineReaderContext context, final Class<TimelineDoc> timelineDocClass,
+ long size) throws NoDocumentFoundException {
+ final List<TimelineDoc> result = queryDocuments(collectionName,
+ context, timelineDocClass, size);
+ if (result.size() > 0) {
+ return result;
+ }
+ throw new NoDocumentFoundException("No documents were found while " +
+ "querying Collection : " + collectionName);
+ }
+
+ @Override
+ public Set<String> fetchEntityTypes(String collectionName,
+ TimelineReaderContext context) {
+ StringBuilder queryStrBuilder = new StringBuilder();
+ queryStrBuilder.append(
+ String.format(SELECT_DISTINCT_TYPES_FROM_COLLECTION, collectionName));
+ String sqlQuery = addPredicates(context, collectionName, queryStrBuilder);
+
+ LOG.debug("Querying Collection : {} , with query {}", collectionName,
+ sqlQuery);
+
+ Set<String> entityTypes = new HashSet<>();
+ Iterator<Document> documentIterator = client.queryDocuments(
+ String.format(COLLECTION_LINK, databaseName, collectionName),
+ sqlQuery, null).getQueryIterator();
+ while (documentIterator.hasNext()) {
+ Document document = documentIterator.next();
+ entityTypes.add(document.getString(ENTITY_TYPE_COLUMN));
+ }
+ return entityTypes;
+ }
+
+ @Override
+ public TimelineDoc readDocument(String collectionName, TimelineReaderContext
+ context, final Class<TimelineDoc> timelineDocClass)
+ throws NoDocumentFoundException {
+ final List<TimelineDoc> result = queryDocuments(collectionName,
+ context, timelineDocClass, DEFAULT_DOCUMENTS_SIZE);
+ if(result.size() > 0) {
+ return result.get(0);
+ }
+ throw new NoDocumentFoundException("No documents were found while " +
+ "querying Collection : " + collectionName);
+ }
+
+ private List<TimelineDoc> queryDocuments(String collectionName,
+ TimelineReaderContext context, final Class<TimelineDoc> docClass,
+ final long maxDocumentsSize) {
+ final String sqlQuery = buildQueryWithPredicates(context, collectionName,
+ maxDocumentsSize);
+ List<TimelineDoc> timelineDocs = new ArrayList<>();
+ LOG.debug("Querying Collection : {} , with query {}", collectionName,
+ sqlQuery);
+
+ FeedOptions feedOptions = new FeedOptions();
+ feedOptions.setPageSize((int) maxDocumentsSize);
+ Iterator<Document> documentIterator = client.queryDocuments(
+ String.format(COLLECTION_LINK, databaseName, collectionName),
+ sqlQuery, feedOptions).getQueryIterator();
+ while (documentIterator.hasNext()) {
+ Document document = documentIterator.next();
+ TimelineDoc resultDoc = document.toObject(docClass);
+ if (resultDoc.getCreatedTime() == 0 &&
+ document.getTimestamp() != null) {
+ resultDoc.setCreatedTime(document.getTimestamp().getTime());
+ }
+ timelineDocs.add(resultDoc);
+ }
+ return timelineDocs;
+ }
+
+ private String buildQueryWithPredicates(TimelineReaderContext context,
+ String collectionName, long size) {
+ StringBuilder queryStrBuilder = new StringBuilder();
+ if (size == -1) {
+ queryStrBuilder.append(String.format(SELECT_ALL_FROM_COLLECTION,
+ collectionName));
+ } else {
+ queryStrBuilder.append(String.format(SELECT_TOP_FROM_COLLECTION, size,
+ collectionName));
+ }
+
+ return addPredicates(context, collectionName, queryStrBuilder);
+ }
+
+ private String addPredicates(TimelineReaderContext context,
+ String collectionName, StringBuilder queryStrBuilder) {
+ boolean hasPredicate = false;
+
+ queryStrBuilder.append(WHERE_CLAUSE);
+
+ if (context.getClusterId() != null) {
+ hasPredicate = true;
+ queryStrBuilder.append(String.format(CONTAINS_FUNC_FOR_ID,
+ context.getClusterId()));
+ }
+ if (context.getUserId() != null) {
+ hasPredicate = true;
+ queryStrBuilder.append(AND_OPERATOR)
+ .append(String.format(CONTAINS_FUNC_FOR_ID, context.getUserId()));
+ }
+ if (context.getFlowName() != null) {
+ hasPredicate = true;
+ queryStrBuilder.append(AND_OPERATOR)
+ .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowName()));
+ }
+ if (context.getAppId() != null) {
+ hasPredicate = true;
+ queryStrBuilder.append(AND_OPERATOR)
+ .append(String.format(CONTAINS_FUNC_FOR_ID, context.getAppId()));
+ }
+ if (context.getEntityId() != null) {
+ hasPredicate = true;
+ queryStrBuilder.append(AND_OPERATOR)
+ .append(String.format(CONTAINS_FUNC_FOR_ID, context.getEntityId()));
+ }
+ if (context.getFlowRunId() != null) {
+ hasPredicate = true;
+ queryStrBuilder.append(AND_OPERATOR)
+ .append(String.format(CONTAINS_FUNC_FOR_ID, context.getFlowRunId()));
+ }
+ if (context.getEntityType() != null){
+ hasPredicate = true;
+ queryStrBuilder.append(AND_OPERATOR)
+ .append(String.format(CONTAINS_FUNC_FOR_TYPE,
+ context.getEntityType()));
+ }
+
+ if (hasPredicate) {
+ queryStrBuilder.append(ORDER_BY_CLAUSE);
+ LOG.debug("CosmosDB Sql Query with predicates : {}", queryStrBuilder);
+ return queryStrBuilder.toString();
+ }
+ throw new IllegalArgumentException("The TimelineReaderContext does not " +
+ "have enough information to query documents for Collection : " +
+ collectionName);
+ }
+
+ @Override
+ public synchronized void close() {
+ if (client != null) {
+ LOG.info("Closing Cosmos DB Client...");
+ client.close();
+ client = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/package-info.java
new file mode 100644
index 0000000..3a20892
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/cosmosdb/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore
+ * .reader.cosmosdb DocumentStore Reader implementation for CosmosDB.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.cosmosdb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/package-info.java
new file mode 100644
index 0000000..e36bf51
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader
+ * contains the implementation of different DocumentStore reader clients
+ * for DocumentVendor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DocumentStoreWriter.java
new file mode 100755
index 0000000..31b2710
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DocumentStoreWriter.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+
+/**
+ * Every {@link DocumentStoreVendor} have to implement this for creating
+ * writer to its backend.
+ */
+public interface DocumentStoreWriter<Document> extends AutoCloseable {
+
+ void createDatabase();
+
+ void createCollection(String collectionName);
+
+ void writeDocument(Document document, CollectionType collectionType);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.java
new file mode 100755
index 0000000..5c3c56d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/TimelineCollectionWriter.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is a generic Collection Writer that can be used for writing documents
+ * belonging to different {@link CollectionType} under a specific
+ * {@link DocumentStoreVendor} backend.
+ */
+public class TimelineCollectionWriter<Document extends TimelineDocument> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TimelineCollectionWriter.class);
+
+ private final static String DOCUMENT_BUFFER_SIZE_CONF =
+ "yarn.timeline-service.document-buffer.size";
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+ private static final int AWAIT_TIMEOUT_SECS = 5;
+ private static final PerNodeAggTimelineCollectorMetrics METRICS =
+ PerNodeAggTimelineCollectorMetrics.getInstance();
+
+ private final CollectionType collectionType;
+ private final DocumentStoreWriter<Document> documentStoreWriter;
+ private final Map<String, Document> documentsBuffer;
+ private final int maxBufferSize;
+ private final ScheduledExecutorService scheduledDocumentsFlusher;
+ private final ExecutorService documentsBufferFullFlusher;
+
+ public TimelineCollectionWriter(CollectionType collectionType,
+ Configuration conf) throws YarnException {
+ LOG.info("Initializing TimelineCollectionWriter for collection type : {}",
+ collectionType);
+ int flushIntervalSecs = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
+ YarnConfiguration
+ .DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
+ maxBufferSize = conf.getInt(DOCUMENT_BUFFER_SIZE_CONF, DEFAULT_BUFFER_SIZE);
+ documentsBuffer = new HashMap<>(maxBufferSize);
+ this.collectionType = collectionType;
+ documentStoreWriter = DocumentStoreFactory.createDocumentStoreWriter(conf);
+ scheduledDocumentsFlusher = Executors.newSingleThreadScheduledExecutor();
+ scheduledDocumentsFlusher.scheduleAtFixedRate(this::flush,
+ flushIntervalSecs, flushIntervalSecs, TimeUnit.SECONDS);
+ documentsBufferFullFlusher = Executors.newSingleThreadExecutor();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void writeDocument(Document timelineDocument) {
+ /*
+ * The DocumentBuffer is used to buffer the most frequently used
+ * documents for performing upserts on them, whenever either due to
+ * buffer gets fulled or the scheduledDocumentsFlusher
+ * invokes flush() periodically, all the buffered documents would be written
+ * to DocumentStore in a background thread.
+ */
+ long startTime = Time.monotonicNow();
+
+ synchronized(documentsBuffer) {
+ //if buffer is full copy to flushBuffer in order to flush
+ if (documentsBuffer.size() == maxBufferSize) {
+ final Map<String, Document> flushedBuffer = copyToFlushBuffer();
+ //flush all documents from flushBuffer in background
+ documentsBufferFullFlusher.execute(() -> flush(flushedBuffer));
+ }
+ Document prevDocument = documentsBuffer.get(timelineDocument.getId());
+ // check if Document exists inside documentsBuffer
+ if (prevDocument != null) {
+ prevDocument.merge(timelineDocument);
+ } else { // else treat this as a new document
+ prevDocument = timelineDocument;
+ }
+ documentsBuffer.put(prevDocument.getId(), prevDocument);
+ }
+ METRICS.addAsyncPutEntitiesLatency(Time.monotonicNow() - startTime,
+ true);
+ }
+
+ private Map<String, Document> copyToFlushBuffer() {
+ Map<String, Document> flushBuffer = new HashMap<>();
+ synchronized(documentsBuffer) {
+ if (documentsBuffer.size() > 0) {
+ flushBuffer.putAll(documentsBuffer);
+ documentsBuffer.clear();
+ }
+ }
+ return flushBuffer;
+ }
+
+ private void flush(Map<String, Document> flushBuffer) {
+ for (Document document : flushBuffer.values()) {
+ documentStoreWriter.writeDocument(document, collectionType);
+ }
+ }
+
+ public void flush() {
+ flush(copyToFlushBuffer());
+ }
+
+ public void close() throws Exception {
+ scheduledDocumentsFlusher.shutdown();
+ documentsBufferFullFlusher.shutdown();
+
+ flush();
+
+ scheduledDocumentsFlusher.awaitTermination(
+ AWAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ documentsBufferFullFlusher.awaitTermination(
+ AWAIT_TIMEOUT_SECS, TimeUnit.SECONDS);
+ documentStoreWriter.close();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
new file mode 100755
index 0000000..b345276
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/CosmosDBDocumentStoreWriter.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+
+import com.microsoft.azure.documentdb.AccessCondition;
+import com.microsoft.azure.documentdb.AccessConditionType;
+import com.microsoft.azure.documentdb.Database;
+import com.microsoft.azure.documentdb.Document;
+import com.microsoft.azure.documentdb.DocumentClient;
+import com.microsoft.azure.documentdb.DocumentClientException;
+import com.microsoft.azure.documentdb.DocumentCollection;
+import com.microsoft.azure.documentdb.RequestOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.server.timelineservice.metrics.PerNodeAggTimelineCollectorMetrics;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the Document Store Writer implementation for
+ * {@link DocumentStoreVendor#COSMOS_DB}.
+ */
+public class CosmosDBDocumentStoreWriter<TimelineDoc extends TimelineDocument>
+ implements DocumentStoreWriter<TimelineDoc> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CosmosDBDocumentStoreWriter.class);
+
+ private static DocumentClient client;
+ private final String databaseName;
+ private static final PerNodeAggTimelineCollectorMetrics METRICS =
+ PerNodeAggTimelineCollectorMetrics.getInstance();
+ private static final String DATABASE_LINK = "/dbs/%s";
+ private static final String COLLECTION_LINK = DATABASE_LINK + "/colls/%s";
+ private static final String DOCUMENT_LINK = COLLECTION_LINK + "/docs/%s";
+
+ public CosmosDBDocumentStoreWriter(Configuration conf) {
+ LOG.info("Initializing Cosmos DB DocumentStoreWriter...");
+ databaseName = DocumentStoreUtils.getCosmosDBDatabaseName(conf);
+ // making CosmosDB Client Singleton
+ if (client == null) {
+ synchronized (this) {
+ if (client == null) {
+ LOG.info("Creating Cosmos DB Client...");
+ client = DocumentStoreUtils.createCosmosDBClient(conf);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void createDatabase() {
+ try {
+ client.readDatabase(String.format(
+ DATABASE_LINK, databaseName), new RequestOptions());
+ LOG.info("Database {} already exists.", databaseName);
+ } catch (DocumentClientException docExceptionOnRead) {
+ if (docExceptionOnRead.getStatusCode() == 404) {
+ LOG.info("Creating new Database : {}", databaseName);
+ Database databaseDefinition = new Database();
+ databaseDefinition.setId(databaseName);
+ try {
+ client.createDatabase(databaseDefinition, new RequestOptions());
+ } catch (DocumentClientException docExceptionOnCreate) {
+ LOG.error("Unable to create new Database : {}", databaseName,
+ docExceptionOnCreate);
+ }
+ } else {
+ LOG.error("Error while reading Database : {}", databaseName,
+ docExceptionOnRead);
+ }
+ }
+ }
+
+ @Override
+ public void createCollection(final String collectionName) {
+ LOG.info("Creating Timeline Collection : {} for Database : {}",
+ collectionName, databaseName);
+ try {
+ client.readCollection(String.format(COLLECTION_LINK, databaseName,
+ collectionName), new RequestOptions());
+ LOG.info("Collection {} already exists.", collectionName);
+ } catch (DocumentClientException docExceptionOnRead) {
+ if (docExceptionOnRead.getStatusCode() == 404) {
+ DocumentCollection collection = new DocumentCollection();
+ collection.setId(collectionName);
+ LOG.info("Creating collection {} under Database {}",
+ collectionName, databaseName);
+ try {
+ client.createCollection(
+ String.format(DATABASE_LINK, databaseName),
+ collection, new RequestOptions());
+ } catch (DocumentClientException docExceptionOnCreate) {
+ LOG.error("Unable to create Collection : {} under Database : {}",
+ collectionName, databaseName, docExceptionOnCreate);
+ }
+ } else {
+ LOG.error("Error while reading Collection : {} under Database : {}",
+ collectionName, databaseName, docExceptionOnRead);
+ }
+ }
+ }
+
+ @Override
+ public void writeDocument(final TimelineDoc timelineDoc,
+ final CollectionType collectionType) {
+ LOG.debug("Upserting document under collection : {} with entity type : " +
+ "{} under Database {}", databaseName, timelineDoc.getType(),
+ collectionType.getCollectionName());
+ boolean succeeded = false;
+ long startTime = Time.monotonicNow();
+ try {
+ upsertDocument(collectionType, timelineDoc);
+ succeeded = true;
+ } catch (Exception e) {
+ LOG.error("Unable to perform upsert for Document Id : {} under " +
+ "Collection : {} under Database {}", timelineDoc.getId(),
+ collectionType.getCollectionName(), databaseName, e);
+ } finally {
+ long latency = Time.monotonicNow() - startTime;
+ METRICS.addPutEntitiesLatency(latency, succeeded);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void upsertDocument(final CollectionType collectionType,
+ final TimelineDoc timelineDoc) {
+ final String collectionLink = String.format(COLLECTION_LINK, databaseName,
+ collectionType.getCollectionName());
+ RequestOptions requestOptions = new RequestOptions();
+ AccessCondition accessCondition = new AccessCondition();
+ StringBuilder eTagStrBuilder = new StringBuilder();
+
+ TimelineDoc updatedTimelineDoc = applyUpdatesOnPrevDoc(collectionType,
+ timelineDoc, eTagStrBuilder);
+
+ accessCondition.setCondition(eTagStrBuilder.toString());
+ accessCondition.setType(AccessConditionType.IfMatch);
+ requestOptions.setAccessCondition(accessCondition);
+
+ try {
+ client.upsertDocument(collectionLink, updatedTimelineDoc,
+ requestOptions, true);
+ LOG.debug("Successfully wrote doc with id : {} and type : {} under " +
+ "Database : {}", timelineDoc.getId(), timelineDoc.getType(),
+ databaseName);
+ } catch (DocumentClientException e) {
+ if (e.getStatusCode() == 409) {
+ LOG.warn("There was a conflict while upserting, hence retrying...", e);
+ upsertDocument(collectionType, updatedTimelineDoc);
+ }
+ LOG.error("Error while upserting Collection : {} with Doc Id : {} under" +
+ " Database : {}", collectionType.getCollectionName(),
+ updatedTimelineDoc.getId(), databaseName, e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private TimelineDoc applyUpdatesOnPrevDoc(CollectionType collectionType,
+ TimelineDoc timelineDoc, StringBuilder eTagStrBuilder) {
+ TimelineDoc prevDocument = fetchLatestDoc(collectionType,
+ timelineDoc.getId(), eTagStrBuilder);
+ if (prevDocument != null) {
+ prevDocument.merge(timelineDoc);
+ timelineDoc = prevDocument;
+ }
+ return timelineDoc;
+ }
+
+ @SuppressWarnings("unchecked")
+ private TimelineDoc fetchLatestDoc(final CollectionType collectionType,
+ final String documentId, StringBuilder eTagStrBuilder) {
+ final String documentLink = String.format(DOCUMENT_LINK, databaseName,
+ collectionType.getCollectionName(), documentId);
+ try {
+ Document latestDocument = client.readDocument(documentLink, new
+ RequestOptions()).getResource();
+ TimelineDoc timelineDoc;
+ switch (collectionType) {
+ case FLOW_RUN:
+ timelineDoc = (TimelineDoc) latestDocument.toObject(
+ FlowRunDocument.class);
+ break;
+ case FLOW_ACTIVITY:
+ timelineDoc = (TimelineDoc) latestDocument.toObject(FlowActivityDocument
+ .class);
+ break;
+ default:
+ timelineDoc = (TimelineDoc) latestDocument.toObject(
+ TimelineEntityDocument.class);
+ }
+ eTagStrBuilder.append(latestDocument.getETag());
+ return timelineDoc;
+ } catch (Exception e) {
+ LOG.debug("No previous Document found with id : {} for Collection" +
+ " : {} under Database : {}", documentId, collectionType
+ .getCollectionName(), databaseName);
+ return null;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (client != null) {
+ LOG.info("Closing Cosmos DB Client...");
+ client.close();
+ client = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/package-info.java
new file mode 100644
index 0000000..3f2165f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/cosmosdb/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore
+ * .writer.cosmosdb DocumentStore writer implementation for CosmosDB.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.cosmosdb;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/package-info.java
new file mode 100644
index 0000000..544d321
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer
+ * contains the implementation of different DocumentStore writer clients
+ * for DocumentVendor.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTestUtils.java
new file mode 100755
index 0000000..cf57085
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTestUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This is util class for baking sample TimelineEntities data for test.
+ */
+public final class DocumentStoreTestUtils {
+
+ private DocumentStoreTestUtils(){}
+
+
+ public static List<TimelineEntity> bakeTimelineEntities()
+ throws IOException {
+ String jsonStr = IOUtils.toString(
+ DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+ "documents/timeline-entities.json"), "UTF-8");
+ return JsonUtils.fromJson(jsonStr,
+ new TypeReference<List<TimelineEntity>>(){});
+ }
+
+ public static List<TimelineEntityDocument> bakeYarnAppTimelineEntities()
+ throws IOException {
+ String jsonStr = IOUtils.toString(
+ DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+ "documents/test-timeline-entities-doc.json"), "UTF-8");
+ return JsonUtils.fromJson(jsonStr,
+ new TypeReference<List<TimelineEntityDocument>>() {});
+ }
+
+ public static TimelineEntityDocument bakeTimelineEntityDoc()
+ throws IOException {
+ String jsonStr = IOUtils.toString(
+ DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+ "documents/timeline-app-doc.json"), "UTF-8");
+ return JsonUtils.fromJson(jsonStr,
+ new TypeReference<TimelineEntityDocument>() {});
+ }
+
+ public static FlowActivityDocument bakeFlowActivityDoc() throws IOException {
+ String jsonStr = IOUtils.toString(
+ DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+ "documents/flowactivity-doc.json"), "UTF-8");
+ return JsonUtils.fromJson(jsonStr,
+ new TypeReference<FlowActivityDocument>() {});
+ }
+
+ public static FlowRunDocument bakeFlowRunDoc() throws IOException {
+ String jsonStr = IOUtils.toString(
+ DocumentStoreTestUtils.class.getClassLoader().getResourceAsStream(
+ "documents/flowrun-doc.json"), "UTF-8");
+ return JsonUtils.fromJson(jsonStr,
+ new TypeReference<FlowRunDocument>(){});
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/JsonUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/JsonUtils.java
new file mode 100755
index 0000000..c1da4f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/JsonUtils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import java.io.IOException;
+
+/**
+ * A simple util class for Json SerDe.
+ */
+public final class JsonUtils {
+
+ private JsonUtils(){}
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ static {
+ OBJECT_MAPPER.configure(
+ DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ /**
+ * Deserialize the Json String to JAVA Object.
+ * @param jsonStr
+ * json string that has to be deserialized
+ * @param type
+ * of JAVA Object
+ * @return JAVA Object after deserialization
+ * @throws IOException if Json String is not valid or error
+ * while deserialization
+ */
+ public static <T> T fromJson(final String jsonStr, final TypeReference type)
+ throws IOException {
+ return OBJECT_MAPPER.readValue(jsonStr, type);
+ }
+
+ public static ObjectMapper getObjectMapper() {
+ return OBJECT_MAPPER;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreCollectionCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreCollectionCreator.java
new file mode 100644
index 0000000..879b0ad
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreCollectionCreator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DummyDocumentStoreWriter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test case for ${@link DocumentStoreCollectionCreator}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreFactory.class)
+public class TestDocumentStoreCollectionCreator {
+
+ private final DocumentStoreWriter<TimelineDocument> documentStoreWriter = new
+ DummyDocumentStoreWriter<>();
+ private final Configuration conf = new Configuration();
+
+ @Before
+ public void setUp() throws YarnException {
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+ "TestDB");
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
+ "https://localhost:443");
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
+ "1234567");
+ PowerMockito.mockStatic(DocumentStoreFactory.class);
+ PowerMockito.when(DocumentStoreFactory.createDocumentStoreWriter(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn(documentStoreWriter);
+ }
+
+ @Test
+ public void collectionCreatorTest() {
+ new DocumentStoreCollectionCreator().createTimelineSchema(new String[]{});
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineReaderImpl.java
new file mode 100755
index 0000000..181e134
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineReaderImpl.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.reader.DummyDocumentStoreReader;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Test case for {@link DocumentStoreTimelineReaderImpl}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreFactory.class)
+public class TestDocumentStoreTimelineReaderImpl {
+
+ private final DocumentStoreReader<TimelineDocument> documentStoreReader = new
+ DummyDocumentStoreReader<>();
+ private final List<TimelineEntity> entities = DocumentStoreTestUtils
+ .bakeTimelineEntities();
+ private final TimelineEntityDocument appTimelineEntity =
+ DocumentStoreTestUtils.bakeTimelineEntityDoc();
+
+ private final Configuration conf = new Configuration();
+ private final TimelineReaderContext context = new
+ TimelineReaderContext(null, null, null,
+ 1L, null, null, null);
+ private final DocumentStoreTimelineReaderImpl timelineReader = new
+ DocumentStoreTimelineReaderImpl();
+
+ public TestDocumentStoreTimelineReaderImpl() throws IOException {
+ }
+
+ @Before
+ public void setUp() throws YarnException {
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+ "TestDB");
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
+ "https://localhost:443");
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
+ "1234567");
+ PowerMockito.mockStatic(DocumentStoreFactory.class);
+ PowerMockito.when(DocumentStoreFactory.createDocumentStoreReader(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn(documentStoreReader);
+ }
+
+ @Test(expected = YarnException.class)
+ public void testFailOnNoCosmosDBConfigs() throws Exception {
+ DocumentStoreUtils.validateCosmosDBConf(new Configuration());
+ }
+
+ @Test
+ public void testGetEntity() throws Exception {
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ EnumSet<TimelineReader.Field> fieldsToRetrieve = EnumSet.noneOf(
+ TimelineReader.Field.class);
+ dataToRetrieve.setFieldsToRetrieve(fieldsToRetrieve);
+
+ TimelineEntity timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+
+ Assert.assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
+ .getCreatedTime().longValue());
+ Assert.assertEquals(0, timelineEntity .getMetrics().size());
+ Assert.assertEquals(0, timelineEntity.getEvents().size());
+ Assert.assertEquals(0, timelineEntity.getConfigs().size());
+ Assert.assertEquals(appTimelineEntity.getInfo().size(),
+ timelineEntity.getInfo().size());
+ }
+
+ @Test
+ public void testGetEntityCustomField() throws Exception {
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.METRICS);
+
+ TimelineEntity timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+
+ Assert.assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
+ .getCreatedTime().longValue());
+ Assert.assertEquals(appTimelineEntity.getMetrics().size(),
+ timelineEntity.getMetrics().size());
+ Assert.assertEquals(0, timelineEntity.getEvents().size());
+ Assert.assertEquals(0, timelineEntity.getConfigs().size());
+ Assert.assertEquals(appTimelineEntity.getInfo().size(),
+ timelineEntity.getInfo().size());
+ }
+
+ @Test
+ public void testGetEntityAllFields() throws Exception {
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);
+
+ TimelineEntity timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+
+ Assert.assertEquals(appTimelineEntity.getCreatedTime(), timelineEntity
+ .getCreatedTime().longValue());
+ Assert.assertEquals(appTimelineEntity.getMetrics().size(),
+ timelineEntity .getMetrics().size());
+ Assert.assertEquals(appTimelineEntity.getEvents().size(),
+ timelineEntity.getEvents().size());
+ Assert.assertEquals(appTimelineEntity.getConfigs().size(),
+ timelineEntity.getConfigs().size());
+ Assert.assertEquals(appTimelineEntity.getInfo().size(),
+ timelineEntity.getInfo().size());
+ }
+
+ @Test
+ public void testGetAllEntities() throws Exception {
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);
+
+ Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().build(), dataToRetrieve);
+
+ Assert.assertEquals(entities.size(), actualEntities.size());
+ }
+
+ @Test
+ public void testGetEntitiesWithLimit() throws Exception {
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+
+ Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().entityLimit(2L).build(),
+ dataToRetrieve);
+
+ Assert.assertEquals(2, actualEntities.size());
+ }
+
+ @Test
+ public void testGetEntitiesByWindows() throws Exception {
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+
+ Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().createdTimeBegin(1533985554927L)
+ .createTimeEnd(1533985554927L).build(), dataToRetrieve);
+
+ Assert.assertEquals(1, actualEntities.size());
+ }
+
+ @Test
+ public void testGetFilteredEntities() throws Exception {
+
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ timelineReader.serviceInit(conf);
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.ALL);
+
+ // Get entities based on info filters.
+ TimelineFilterList infoFilterList = new TimelineFilterList();
+ infoFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL,
+ "YARN_APPLICATION_ATTEMPT_FINAL_STATUS", "SUCCEEDED"));
+ Set<TimelineEntity> actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().infoFilters(infoFilterList).build(),
+ dataToRetrieve);
+
+ Assert.assertEquals(1, actualEntities.size());
+ // Only one entity with type YARN_APPLICATION_ATTEMPT should be returned.
+ for (TimelineEntity entity : actualEntities) {
+ if (!entity.getType().equals("YARN_APPLICATION_ATTEMPT")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ // Get entities based on config filters.
+ TimelineFilterList confFilterList = new TimelineFilterList();
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ confFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL,
+ "YARN_AM_NODE_LABEL_EXPRESSION", "<DEFAULT_PARTITION>"));
+ actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().configFilters(confFilterList)
+ .build(), dataToRetrieve);
+
+ Assert.assertEquals(1, actualEntities.size());
+ // Only one entity with type YARN_APPLICATION should be returned.
+ for (TimelineEntity entity : actualEntities) {
+ if (!entity.getType().equals("YARN_APPLICATION")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ // Get entities based on event filters.
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ TimelineFilterList eventFilters = new TimelineFilterList();
+ eventFilters.addFilter(
+ new TimelineExistsFilter(TimelineCompareOp.EQUAL,
+ "CONTAINER_LAUNCHED"));
+ actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().eventFilters(eventFilters).build(),
+ dataToRetrieve);
+
+ Assert.assertEquals(1, actualEntities.size());
+ // Only one entity with type YARN_CONTAINER should be returned.
+ for (TimelineEntity entity : actualEntities) {
+ if (!entity.getType().equals("YARN_CONTAINER")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ // Get entities based on metric filters.
+ TimelineFilterList metricFilterList = new TimelineFilterList();
+ metricFilterList.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_OR_EQUAL, "MEMORY", 150298624L));
+ actualEntities = timelineReader.getEntities(context,
+ new TimelineEntityFilters.Builder().metricFilters(metricFilterList)
+ .build(), dataToRetrieve);
+
+ Assert.assertEquals(1, actualEntities.size());
+ // Only one entity with type YARN_CONTAINER should be returned.
+ for (TimelineEntity entity : actualEntities) {
+ if (!entity.getType().equals("YARN_CONTAINER")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+ }
+
+ @Test
+ public void testReadingDifferentEntityTypes() throws Exception {
+
+ timelineReader.serviceInit(conf);
+
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+
+ // reading YARN_FLOW_ACTIVITY
+ context.setEntityType(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+ TimelineEntity timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+
+ Assert.assertEquals(TimelineEntityType.YARN_FLOW_ACTIVITY.toString(),
+ timelineEntity.getType());
+
+ // reading YARN_FLOW_RUN
+ context.setEntityType(TimelineEntityType.YARN_FLOW_RUN.toString());
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+
+ Assert.assertEquals(TimelineEntityType.YARN_FLOW_RUN.toString(),
+ timelineEntity.getType());
+
+ // reading YARN_APPLICATION
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+
+ Assert.assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
+ timelineEntity.getType());
+ }
+
+ @Test
+ public void testReadingAllEntityTypes() throws Exception {
+
+ timelineReader.serviceInit(conf);
+
+ context.setEntityType(TimelineEntityType.YARN_CONTAINER.toString());
+ Set<String> entityTypes = timelineReader.getEntityTypes(context);
+ Assert.assertTrue(entityTypes.contains(TimelineEntityType.YARN_CONTAINER
+ .toString()));
+ Assert.assertTrue(entityTypes.contains(TimelineEntityType
+ .YARN_APPLICATION_ATTEMPT.toString()));
+ }
+
+ @Test
+ public void testMetricsToRetrieve() throws Exception {
+
+ timelineReader.serviceInit(conf);
+
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.METRICS);
+ TimelineFilterList timelineFilterList = new TimelineFilterList();
+
+ //testing metrics prefix for OR condition
+ timelineFilterList.setOperator(TimelineFilterList.Operator.OR);
+ timelineFilterList.addFilter(new TimelinePrefixFilter(
+ TimelineCompareOp.EQUAL, "NOTHING"));
+ dataToRetrieve.setMetricsToRetrieve(timelineFilterList);
+
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEntity timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertEquals(0, timelineEntity.getMetrics().size());
+
+ timelineFilterList.addFilter(new TimelinePrefixFilter(
+ TimelineCompareOp.EQUAL,
+ "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED"));
+ dataToRetrieve.setMetricsToRetrieve(timelineFilterList);
+
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertTrue(timelineEntity.getMetrics().size() > 0);
+
+ //testing metrics prefix for AND condition
+ timelineFilterList.setOperator(TimelineFilterList.Operator.AND);
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertEquals(0, timelineEntity.getMetrics().size());
+
+ dataToRetrieve.getMetricsToRetrieve().getFilterList().remove(0);
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertTrue(timelineEntity.getMetrics().size() > 0);
+ }
+
+ @Test
+ public void testConfigsToRetrieve() throws Exception {
+
+ timelineReader.serviceInit(conf);
+
+ TimelineDataToRetrieve dataToRetrieve = new TimelineDataToRetrieve();
+ dataToRetrieve.getFieldsToRetrieve().add(TimelineReader.Field.CONFIGS);
+ TimelineFilterList timelineFilterList = new TimelineFilterList();
+
+ //testing metrics prefix for OR condition
+ timelineFilterList.setOperator(TimelineFilterList.Operator.OR);
+ timelineFilterList.addFilter(new TimelinePrefixFilter(
+ TimelineCompareOp.EQUAL, "NOTHING"));
+ dataToRetrieve.setConfsToRetrieve(timelineFilterList);
+
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEntity timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertEquals(0, timelineEntity.getConfigs().size());
+
+ timelineFilterList.addFilter(new TimelinePrefixFilter(
+ TimelineCompareOp.EQUAL, "YARN_AM_NODE_LABEL_EXPRESSION"));
+ dataToRetrieve.setConfsToRetrieve(timelineFilterList);
+
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertTrue(timelineEntity.getConfigs().size() > 0);
+
+ //testing metrics prefix for AND condition
+ timelineFilterList.setOperator(TimelineFilterList.Operator.AND);
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertEquals(0, timelineEntity.getConfigs().size());
+
+ dataToRetrieve.getConfsToRetrieve().getFilterList().remove(0);
+ context.setEntityType(TimelineEntityType.YARN_APPLICATION.toString());
+ timelineEntity = timelineReader.getEntity(context,
+ dataToRetrieve);
+ Assert.assertTrue(timelineEntity.getConfigs().size() > 0);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineWriterImpl.java
new file mode 100755
index 0000000..b654de8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/TestDocumentStoreTimelineWriterImpl.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreFactory;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DocumentStoreWriter;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.writer.DummyDocumentStoreWriter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test case for {@link DocumentStoreTimelineWriterImpl}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DocumentStoreFactory.class)
+public class TestDocumentStoreTimelineWriterImpl {
+
+ private final DocumentStoreWriter<TimelineDocument> documentStoreWriter = new
+ DummyDocumentStoreWriter<>();
+ private final Configuration conf = new Configuration();
+
+ @Before
+ public void setUp() throws YarnException {
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_DOCUMENTSTORE_DATABASE_NAME,
+ "TestDB");
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_ENDPOINT,
+ "https://localhost:443");
+ conf.set(DocumentStoreUtils.TIMELINE_SERVICE_COSMOSDB_MASTER_KEY,
+ "1234567");
+ PowerMockito.mockStatic(DocumentStoreFactory.class);
+ PowerMockito.when(DocumentStoreFactory.createDocumentStoreWriter(
+ ArgumentMatchers.any(Configuration.class)))
+ .thenReturn(documentStoreWriter);
+ }
+
+ @Test(expected = YarnException.class)
+ public void testFailOnNoCosmosDBConfigs() throws Exception {
+ DocumentStoreUtils.validateCosmosDBConf(new Configuration());
+ }
+
+ @Test
+ public void testWritingToCosmosDB() throws Exception {
+ DocumentStoreTimelineWriterImpl timelineWriter = new
+ DocumentStoreTimelineWriterImpl();
+
+ timelineWriter.serviceInit(conf);
+
+ TimelineEntities entities = new TimelineEntities();
+ entities.addEntities(DocumentStoreTestUtils.bakeTimelineEntities());
+ entities.addEntity(DocumentStoreTestUtils.bakeTimelineEntityDoc()
+ .fetchTimelineEntity());
+
+ PowerMockito.verifyStatic(DocumentStoreFactory.class);
+
+ TimelineCollectorContext context = new TimelineCollectorContext();
+ context.setFlowName("TestFlow");
+ context.setAppId("DUMMY_APP_ID");
+ context.setClusterId("yarn_cluster");
+ context.setUserId("test_user");
+ timelineWriter.write(context, entities,
+ UserGroupInformation.createRemoteUser("test_user"));
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/TestDocumentOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/TestDocumentOperations.java
new file mode 100755
index 0000000..0779431
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/collection/TestDocumentOperations.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.collection;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineMetricSubDoc;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Timeline Entity Document merge and aggregation test.
+ */
+public class TestDocumentOperations {
+
+ private static final String MEMORY_ID = "MEMORY";
+ private static final String FLOW_NAME = "DistributedShell";
+ private static final String FLOW_VERSION = "1";
+
+ @Test
+ public void testTimelineEntityDocMergeOperation() throws IOException {
+ TimelineEntityDocument actualEntityDoc =
+ new TimelineEntityDocument();
+ TimelineEntityDocument expectedEntityDoc =
+ DocumentStoreTestUtils.bakeTimelineEntityDoc();
+
+ Assert.assertEquals(1, actualEntityDoc.getInfo().size());
+ Assert.assertEquals(0, actualEntityDoc.getMetrics().size());
+ Assert.assertEquals(0, actualEntityDoc.getEvents().size());
+ Assert.assertEquals(0, actualEntityDoc.getConfigs().size());
+ Assert.assertEquals(0, actualEntityDoc.getIsRelatedToEntities().size());
+ Assert.assertEquals(0, actualEntityDoc.getRelatesToEntities().size());
+
+ actualEntityDoc.merge(expectedEntityDoc);
+
+ Assert.assertEquals(expectedEntityDoc.getInfo().size(),
+ actualEntityDoc.getInfo().size());
+ Assert.assertEquals(expectedEntityDoc.getMetrics().size(),
+ actualEntityDoc.getMetrics().size());
+ Assert.assertEquals(expectedEntityDoc.getEvents().size(),
+ actualEntityDoc.getEvents().size());
+ Assert.assertEquals(expectedEntityDoc.getConfigs().size(),
+ actualEntityDoc.getConfigs().size());
+ Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+ actualEntityDoc.getIsRelatedToEntities().size());
+ Assert.assertEquals(expectedEntityDoc.getRelatesToEntities().size(),
+ actualEntityDoc.getRelatesToEntities().size());
+ }
+
+ @Test
+ public void testFlowActivityDocMergeOperation() throws IOException {
+ FlowActivityDocument actualFlowActivityDoc = new FlowActivityDocument();
+ FlowActivityDocument expectedFlowActivityDoc =
+ DocumentStoreTestUtils.bakeFlowActivityDoc();
+
+ Assert.assertEquals(0, actualFlowActivityDoc.getDayTimestamp());
+ Assert.assertEquals(0, actualFlowActivityDoc.getFlowActivities().size());
+ Assert.assertNull(actualFlowActivityDoc.getFlowName());
+ Assert.assertEquals(TimelineEntityType.YARN_FLOW_ACTIVITY.toString(),
+ actualFlowActivityDoc.getType());
+ Assert.assertNull(actualFlowActivityDoc.getUser());
+ Assert.assertNull(actualFlowActivityDoc.getId());
+
+ actualFlowActivityDoc.merge(expectedFlowActivityDoc);
+
+ Assert.assertEquals(expectedFlowActivityDoc.getDayTimestamp(),
+ actualFlowActivityDoc.getDayTimestamp());
+ Assert.assertEquals(expectedFlowActivityDoc.getFlowActivities().size(),
+ actualFlowActivityDoc.getFlowActivities().size());
+ Assert.assertEquals(expectedFlowActivityDoc.getFlowName(),
+ actualFlowActivityDoc.getFlowName());
+ Assert.assertEquals(expectedFlowActivityDoc.getType(),
+ actualFlowActivityDoc.getType());
+ Assert.assertEquals(expectedFlowActivityDoc.getUser(),
+ actualFlowActivityDoc.getUser());
+ Assert.assertEquals(expectedFlowActivityDoc.getId(),
+ actualFlowActivityDoc.getId());
+
+ expectedFlowActivityDoc.addFlowActivity(FLOW_NAME,
+ FLOW_VERSION, System.currentTimeMillis());
+
+ actualFlowActivityDoc.merge(expectedFlowActivityDoc);
+
+ Assert.assertEquals(expectedFlowActivityDoc.getDayTimestamp(),
+ actualFlowActivityDoc.getDayTimestamp());
+ Assert.assertEquals(expectedFlowActivityDoc.getFlowActivities().size(),
+ actualFlowActivityDoc.getFlowActivities().size());
+ Assert.assertEquals(expectedFlowActivityDoc.getFlowName(),
+ actualFlowActivityDoc.getFlowName());
+ Assert.assertEquals(expectedFlowActivityDoc.getType(),
+ actualFlowActivityDoc.getType());
+ Assert.assertEquals(expectedFlowActivityDoc.getUser(),
+ actualFlowActivityDoc.getUser());
+ Assert.assertEquals(expectedFlowActivityDoc.getId(),
+ actualFlowActivityDoc.getId());
+ }
+
+ @Test
+ public void testFlowRunDocMergeAndAggOperation() throws IOException {
+ FlowRunDocument actualFlowRunDoc = new FlowRunDocument();
+ FlowRunDocument expectedFlowRunDoc = DocumentStoreTestUtils
+ .bakeFlowRunDoc();
+
+ final long timestamp = System.currentTimeMillis();
+ final long value = 98586624;
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setId(MEMORY_ID);
+ timelineMetric.setType(TimelineMetric.Type.SINGLE_VALUE);
+ timelineMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ timelineMetric.addValue(timestamp, value);
+ TimelineMetricSubDoc metricSubDoc = new TimelineMetricSubDoc(
+ timelineMetric);
+ expectedFlowRunDoc.getMetrics().put(MEMORY_ID, metricSubDoc);
+
+ Assert.assertNull(actualFlowRunDoc.getClusterId());
+ Assert.assertNull(actualFlowRunDoc.getFlowName());
+ Assert.assertNull(actualFlowRunDoc.getFlowRunId());
+ Assert.assertNull(actualFlowRunDoc.getFlowVersion());
+ Assert.assertNull(actualFlowRunDoc.getId());
+ Assert.assertNull(actualFlowRunDoc.getUsername());
+ Assert.assertEquals(actualFlowRunDoc.getType(), TimelineEntityType.
+ YARN_FLOW_RUN.toString());
+ Assert.assertEquals(0, actualFlowRunDoc.getMinStartTime());
+ Assert.assertEquals(0, actualFlowRunDoc.getMaxEndTime());
+ Assert.assertEquals(0, actualFlowRunDoc.getMetrics().size());
+
+ actualFlowRunDoc.merge(expectedFlowRunDoc);
+
+ Assert.assertEquals(expectedFlowRunDoc.getClusterId(),
+ actualFlowRunDoc.getClusterId());
+ Assert.assertEquals(expectedFlowRunDoc.getFlowName(),
+ actualFlowRunDoc.getFlowName());
+ Assert.assertEquals(expectedFlowRunDoc.getFlowRunId(),
+ actualFlowRunDoc.getFlowRunId());
+ Assert.assertEquals(expectedFlowRunDoc.getFlowVersion(),
+ actualFlowRunDoc.getFlowVersion());
+ Assert.assertEquals(expectedFlowRunDoc.getId(), actualFlowRunDoc.getId());
+ Assert.assertEquals(expectedFlowRunDoc.getUsername(),
+ actualFlowRunDoc.getUsername());
+ Assert.assertEquals(expectedFlowRunDoc.getType(),
+ actualFlowRunDoc.getType());
+ Assert.assertEquals(expectedFlowRunDoc.getMinStartTime(),
+ actualFlowRunDoc.getMinStartTime());
+ Assert.assertEquals(expectedFlowRunDoc.getMaxEndTime(),
+ actualFlowRunDoc.getMaxEndTime());
+ Assert.assertEquals(expectedFlowRunDoc.getMetrics().size(),
+ actualFlowRunDoc.getMetrics().size());
+
+ actualFlowRunDoc.merge(expectedFlowRunDoc);
+
+ Assert.assertEquals(value + value, actualFlowRunDoc.getMetrics()
+ .get(MEMORY_ID).getSingleDataValue());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DummyDocumentStoreReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DummyDocumentStoreReader.java
new file mode 100755
index 0000000..1317392
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/reader/DummyDocumentStoreReader.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.reader;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.DocumentStoreTestUtils;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.entity.TimelineEntityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowactivity.FlowActivityDocument;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.flowrun.FlowRunDocument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Dummy Document Store Reader for mocking backend calls for unit test.
+ */
+public class DummyDocumentStoreReader<TimelineDoc extends TimelineDocument>
+ implements DocumentStoreReader<TimelineDoc> {
+
+ private final TimelineEntityDocument entityDoc;
+ private final List<TimelineEntityDocument> entityDocs;
+ private final FlowRunDocument flowRunDoc;
+ private final FlowActivityDocument flowActivityDoc;
+
+ public DummyDocumentStoreReader() {
+ try {
+ entityDoc = DocumentStoreTestUtils.bakeTimelineEntityDoc();
+ entityDocs = DocumentStoreTestUtils.bakeYarnAppTimelineEntities();
+ flowRunDoc = DocumentStoreTestUtils.bakeFlowRunDoc();
+ flowActivityDoc = DocumentStoreTestUtils.bakeFlowActivityDoc();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create " +
+ "DummyDocumentStoreReader : ", e);
+ }
+ }
+
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public TimelineDoc readDocument(String collectionName, TimelineReaderContext
+ context, Class<TimelineDoc> docClass) {
+ switch (TimelineEntityType.valueOf(context.getEntityType())) {
+ case YARN_FLOW_ACTIVITY:
+ return (TimelineDoc) flowActivityDoc;
+ case YARN_FLOW_RUN:
+ return (TimelineDoc) flowRunDoc;
+ default:
+ return (TimelineDoc) entityDoc;
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<TimelineDoc> readDocumentList(String collectionName,
+ TimelineReaderContext context, Class<TimelineDoc> docClass, long size) {
+
+ switch (TimelineEntityType.valueOf(context.getEntityType())) {
+ case YARN_FLOW_ACTIVITY:
+ List<FlowActivityDocument> flowActivityDocs = new ArrayList<>();
+ flowActivityDocs.add(flowActivityDoc);
+ if (size > flowActivityDocs.size()) {
+ size = flowActivityDocs.size();
+ }
+ return (List<TimelineDoc>) flowActivityDocs.subList(0, (int) size);
+ case YARN_FLOW_RUN:
+ List<FlowRunDocument> flowRunDocs = new ArrayList<>();
+ flowRunDocs.add(flowRunDoc);
+ if (size > flowRunDocs.size()) {
+ size = flowRunDocs.size();
+ }
+ return (List<TimelineDoc>) flowRunDocs.subList(0, (int) size);
+ case YARN_APPLICATION:
+ List<TimelineEntityDocument> applicationEntities = new ArrayList<>();
+ applicationEntities.add(entityDoc);
+ if (size > applicationEntities.size()) {
+ size = applicationEntities.size();
+ }
+ return (List<TimelineDoc>) applicationEntities.subList(0, (int) size);
+ default:
+ if (size > entityDocs.size() || size == -1) {
+ size = entityDocs.size();
+ }
+ return (List<TimelineDoc>) entityDocs.subList(0, (int) size);
+ }
+ }
+
+ @Override
+ public Set<String> fetchEntityTypes(String collectionName,
+ TimelineReaderContext context) {
+ return entityDocs.stream().map(TimelineEntityDocument::getType)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DummyDocumentStoreWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DummyDocumentStoreWriter.java
new file mode 100755
index 0000000..764437a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/writer/DummyDocumentStoreWriter.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.documentstore.writer;
+
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.CollectionType;
+import org.apache.hadoop.yarn.server.timelineservice.documentstore.collection.document.TimelineDocument;
+
+/**
+ * Dummy Document Store Writer for mocking backend calls for unit test.
+ */
+public class DummyDocumentStoreWriter<Document extends TimelineDocument>
+ implements DocumentStoreWriter<Document> {
+
+ @Override
+ public void createDatabase() {
+ }
+
+ @Override
+ public void createCollection(String collectionName) {
+ }
+
+ @Override
+ public void writeDocument(Document timelineDocument,
+ CollectionType collectionType) {
+ }
+
+ @Override
+ public void close() {
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowactivity-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowactivity-doc.json
new file mode 100755
index 0000000..aa0261d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowactivity-doc.json
@@ -0,0 +1,20 @@
+{
+ "id": "yarn_cluster!1533859200000!test_user!DistributedShell",
+ "type": "YARN_FLOW_ACTIVITY",
+ "flowActivities": [
+ {
+ "flowName": "DistributedShell",
+ "flowVersion": "1",
+ "flowRunId": 1533871039026
+ },
+ {
+ "flowName": "DistributedShell",
+ "flowVersion": "1",
+ "flowRunId": 1533871599510
+ }
+ ],
+ "dayTimestamp": 1533859200000,
+ "user": "test_user",
+ "flowName": "DistributedShell",
+ "createdTime": 1533859200000000
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowrun-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowrun-doc.json
new file mode 100755
index 0000000..79be6b1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/flowrun-doc.json
@@ -0,0 +1,126 @@
+{
+ "id": "yarn_cluster!test_user!DistributedShell!1533871599510",
+ "type": "YARN_FLOW_RUN",
+ "clusterId": "yarn_cluster",
+ "username": "test_user",
+ "flowName": "DistributedShell",
+ "flowRunId": 1533871599510,
+ "flowVersion": "1",
+ "minStartTime": 1533871599510,
+ "maxEndTime": 1533871614645,
+ "metrics": {
+ "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ },
+ "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ },
+ "YARN_APPLICATION_CPU_PREEMPT_METRIC": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_CPU_PREEMPT_METRIC",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ },
+ "YARN_APPLICATION_MEMORY": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 19858,
+ "id": "YARN_APPLICATION_MEMORY",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 19858
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 19858
+ }
+ },
+ "YARN_APPLICATION_MEM_PREEMPT_METRIC": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_MEM_PREEMPT_METRIC",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ },
+ "YARN_APPLICATION_CPU": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 19,
+ "id": "YARN_APPLICATION_CPU",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 19
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 19
+ }
+ },
+ "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ },
+ "YARN_APPLICATION_AM_CONTAINER_PREEMPTED": {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_AM_CONTAINER_PREEMPTED",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ },
+ "createdTime": 1533871599510
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/test-timeline-entities-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/test-timeline-entities-doc.json
new file mode 100755
index 0000000..c2cd8c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/test-timeline-entities-doc.json
@@ -0,0 +1,185 @@
+[
+ {
+ "context": {
+ "clusterId": "yarn_cluster",
+ "userId": "test_user",
+ "flowName": "DistributedShell",
+ "flowRunId": 1533985547564,
+ "appId": "application_1533985489663_0001"
+ },
+ "flowVersion": "1",
+ "subApplicationUser": "test_user",
+ "metrics": {},
+ "events": {
+ "YARN_RM_CONTAINER_REQUESTED_TYPE": [
+ {
+ "valid": true,
+ "id": "YARN_RM_CONTAINER_REQUESTED_TYPE",
+ "timestamp": 1533985547824,
+ "info": {
+ "YARN_RM_CONTAINER_ALLOCATION_REQUEST_ID": 0
+ }
+ }
+ ],
+ "YARN_APPLICATION_ATTEMPT_FINISHED": [
+ {
+ "valid": true,
+ "id": "YARN_APPLICATION_ATTEMPT_FINISHED",
+ "timestamp": 1533985561254,
+ "info": {}
+ }
+ ],
+ "YARN_APPLICATION_ATTEMPT_REGISTERED": [
+ {
+ "valid": true,
+ "id": "YARN_APPLICATION_ATTEMPT_REGISTERED",
+ "timestamp": 1533985554927,
+ "info": {}
+ }
+ ]
+ },
+ "id": "yarn_cluster!test_user!DistributedShell!1533985547564!application_1533985489663_0001!YARN_APPLICATION_ATTEMPT!appattempt_1533985489663_0001_000001",
+ "type": "YARN_APPLICATION_ATTEMPT",
+ "configs": {},
+ "info": {
+ "SYSTEM_INFO_PARENT_ENTITY": {
+ "type": "YARN_APPLICATION",
+ "id": "application_1533985489663_0001"
+ },
+ "YARN_APPLICATION_ATTEMPT_HOST": "test_user/10.171.19.25",
+ "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER": "container_1533985489663_0001_01_000001",
+ "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL": "N/A",
+ "YARN_APPLICATION_ATTEMPT_RPC_PORT": -1,
+ "YARN_APPLICATION_ATTEMPT_TRACKING_URL": "http://test_user:8088/proxy/application_1533985489663_0001/",
+ "YARN_APPLICATION_ATTEMPT_DIAGNOSTICS_INFO": "",
+ "YARN_APPLICATION_ATTEMPT_STATE": "FINISHED",
+ "YARN_APPLICATION_ATTEMPT_FINAL_STATUS": "SUCCEEDED"
+ },
+ "createdTime": 1533985554927,
+ "relatesToEntities": {},
+ "isRelatedToEntities": {}
+ },
+ {
+ "context": {
+ "clusterId": "yarn_cluster",
+ "userId": "test_user",
+ "flowName": "DistributedShell",
+ "flowRunId": 1533985547564,
+ "appId": "application_1533985489663_0001"
+ },
+ "flowVersion": "1",
+ "subApplicationUser": "test_user",
+ "metrics": {
+ "MEMORY": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533985556335,
+ "singleDataValue": 150298624,
+ "id": "MEMORY",
+ "type": "SINGLE_VALUE",
+ "realtimeAggregationOp": "SUM",
+ "values": {
+ "1533985556335": 150298624
+ },
+ "valuesJAXB": {
+ "1533985556335": 150298624
+ }
+ }
+ ]
+ },
+ "events": {
+ "YARN_RM_CONTAINER_CREATED": [
+ {
+ "valid": true,
+ "id": "YARN_RM_CONTAINER_CREATED",
+ "timestamp": 1533985548047,
+ "info": {
+ "YARN_RM_CONTAINER_ALLOCATION_REQUEST_ID": 0
+ }
+ }
+ ],
+ "YARN_CONTAINER_CREATED": [
+ {
+ "valid": true,
+ "id": "YARN_CONTAINER_CREATED",
+ "timestamp": 1533985549474,
+ "info": {
+ "YARN_NM_EVENT_SOURCE": "CONTAINER_EVENT",
+ "YARN_CONTAINER_STATE": "NEW",
+ "YARN_APPLICATION_STATE": "RUNNING"
+ }
+ }
+ ],
+ "YARN_CONTAINER_FINISHED": [
+ {
+ "valid": true,
+ "id": "YARN_CONTAINER_FINISHED",
+ "timestamp": 1533985560616,
+ "info": {
+ "YARN_NM_EVENT_SOURCE": "CONTAINER_EVENT"
+ }
+ }
+ ]
+ },
+ "id": "yarn_cluster!test_user!DistributedShell!1533985547564!application_1533985489663_0001!YARN_CONTAINER!container_1533985489663_0001_01_000001",
+ "type": "YARN_CONTAINER",
+ "configs": {},
+ "info": {
+ "YARN_CONTAINER_ALLOCATED_PORT": 13076,
+ "YARN_CONTAINER_ALLOCATED_MEMORY": 1024,
+ "YARN_CONTAINER_ALLOCATED_PRIORITY": "0",
+ "YARN_CONTAINER_ALLOCATED_HOST": "test_user",
+ "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS": "test_user:8042",
+ "YARN_CONTAINER_ALLOCATED_VCORE": 1,
+ "SYSTEM_INFO_PARENT_ENTITY": {
+ "type": "YARN_APPLICATION_ATTEMPT",
+ "id": "appattempt_1533985489663_0001_000001"
+ },
+ "YARN_CONTAINER_STATE": "COMPLETE",
+ "YARN_CONTAINER_EXIT_STATUS": 0,
+ "YARN_CONTAINER_DIAGNOSTICS_INFO": "",
+ "YARN_CONTAINER_FINISHED_TIME": 1533985560616
+ },
+ "createdTime": 1533985549474,
+ "relatesToEntities": {},
+ "isRelatedToEntities": {}
+ },
+ {
+ "context": {
+ "clusterId": "yarn_cluster",
+ "userId": "test_user",
+ "flowName": "DistributedShell",
+ "flowRunId": 1533985547564,
+ "appId": "application_1533985489663_0001"
+ },
+ "flowVersion": "1",
+ "subApplicationUser": "test_user",
+ "metrics": {},
+ "events": {
+ "CONTAINER_LAUNCHED": [
+ {
+ "valid": true,
+ "id": "CONTAINER_LAUNCHED",
+ "timestamp": 1533985557747,
+ "info": {
+ "YARN_NM_EVENT_SOURCE": "CONTAINER_EVENT",
+ "YARN_CONTAINER_STATE": "LOCALIZED",
+ "YARN_APPLICATION_STATE": "RUNNING"
+ }
+ }
+ ]
+ },
+ "id": "yarn_cluster!test_user!DistributedShell!1533985547564!application_1533985489663_0001!YARN_CONTAINER!container_1533985489663_0001_01_000002",
+ "type": "YARN_CONTAINER",
+ "configs": {},
+ "info": {
+ "SYSTEM_INFO_PARENT_ENTITY": {
+ "type": "YARN_APPLICATION_ATTEMPT",
+ "id": "appattempt_1533985489663_0001_000001"
+ }
+ },
+ "createdTime": 0,
+ "relatesToEntities": {},
+ "isRelatedToEntities": {}
+ }
+]
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-app-doc.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-app-doc.json
new file mode 100755
index 0000000..510ed47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-app-doc.json
@@ -0,0 +1,203 @@
+{
+ "context": {
+ "clusterId": "yarn_cluster",
+ "userId": "test_user",
+ "flowName": "DistributedShell",
+ "flowRunId": 1533871599510,
+ "appId": "application_1533871545292_0001"
+ },
+ "flowVersion": "1",
+ "subApplicationUser": "test_user",
+ "metrics": {
+ "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_NON_AM_CONTAINER_PREEMPTED",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ ],
+ "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_CPU",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ ],
+ "YARN_APPLICATION_CPU_PREEMPT_METRIC": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_CPU_PREEMPT_METRIC",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ ],
+ "YARN_APPLICATION_MEMORY": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 19858,
+ "id": "YARN_APPLICATION_MEMORY",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 19858
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 19858
+ }
+ }
+ ],
+ "YARN_APPLICATION_MEM_PREEMPT_METRIC": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_MEM_PREEMPT_METRIC",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ ],
+ "YARN_APPLICATION_CPU": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 19,
+ "id": "YARN_APPLICATION_CPU",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 19
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 19
+ }
+ }
+ ],
+ "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_RESOURCE_PREEMPTED_MEMORY",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ ],
+ "YARN_APPLICATION_AM_CONTAINER_PREEMPTED": [
+ {
+ "valid": true,
+ "singleDataTimestamp": 1533871614645,
+ "singleDataValue": 0,
+ "id": "YARN_APPLICATION_AM_CONTAINER_PREEMPTED",
+ "type": "SINGLE_VALUE",
+ "values": {
+ "1533871614645": 0
+ },
+ "realtimeAggregationOp": "NOP",
+ "valuesJAXB": {
+ "1533871614645": 0
+ }
+ }
+ ]
+ },
+ "events": {
+ "YARN_APPLICATION_CREATED": [
+ {
+ "valid": true,
+ "id": "YARN_APPLICATION_CREATED",
+ "timestamp": 1533871599510,
+ "info": {}
+ }
+ ],
+ "YARN_APPLICATION_ACLS_UPDATED": [
+ {
+ "valid": true,
+ "id": "YARN_APPLICATION_ACLS_UPDATED",
+ "timestamp": 1533871599671,
+ "info": {}
+ }
+ ],
+ "YARN_APPLICATION_STATE_UPDATED": [
+ {
+ "valid": true,
+ "id": "YARN_APPLICATION_STATE_UPDATED",
+ "timestamp": 1533871608094,
+ "info": {
+ "YARN_APPLICATION_STATE": "RUNNING"
+ }
+ }
+ ],
+ "YARN_APPLICATION_FINISHED": [
+ {
+ "valid": true,
+ "id": "YARN_APPLICATION_FINISHED",
+ "timestamp": 1533871614645,
+ "info": {}
+ }
+ ]
+ },
+ "id": "yarn_cluster!test_user!DistributedShell!1533871599510!application_1533871545292_0001!YARN_APPLICATION!application_1533871545292_0001",
+ "type": "YARN_APPLICATION",
+ "createdTime": 1533871599510,
+ "info": {
+ "YARN_APPLICATION_VIEW_ACLS": "",
+ "YARN_APPLICATION_SUBMITTED_TIME": 1533871599446,
+ "YARN_AM_CONTAINER_LAUNCH_COMMAND": [
+ "{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr "
+ ],
+ "YARN_APPLICATION_NAME": "DistributedShell",
+ "YARN_APPLICATION_USER": "test_user",
+ "YARN_APPLICATION_QUEUE": "default",
+ "YARN_APPLICATION_TYPE": "YARN",
+ "YARN_APPLICATION_UNMANAGED_APPLICATION": false,
+ "YARN_APPLICATION_TAGS": [],
+ "YARN_APPLICATION_STATE": "FINISHED",
+ "YARN_APPLICATION_DIAGNOSTICS_INFO": "",
+ "YARN_APPLICATION_FINAL_STATUS": "SUCCEEDED",
+ "YARN_APPLICATION_LATEST_APP_ATTEMPT": "appattempt_1533871545292_0001_000001"
+ },
+ "configs": {
+ "YARN_AM_NODE_LABEL_EXPRESSION": "<DEFAULT_PARTITION>",
+ "YARN_APP_NODE_LABEL_EXPRESSION": "<Not set>"
+ },
+ "relatesToEntities": {},
+ "isRelatedToEntities": {}
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-entities.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-entities.json
new file mode 100755
index 0000000..9980c64
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/test/resources/documents/timeline-entities.json
@@ -0,0 +1,119 @@
+[
+ {
+ "identifier": {
+ "type": "YARN_APPLICATION",
+ "id": "application_1532614298307_0002"
+ },
+ "info": {
+ "YARN_APPLICATION_VIEW_ACLS": ""
+ },
+ "configs": {},
+ "metrics": [],
+ "events": [
+ {
+ "id": "YARN_APPLICATION_ACLS_UPDATED",
+ "info": {},
+ "timestamp": 1532614542444,
+ "valid": true,
+ "infoJAXB": {}
+ }
+ ],
+ "id": "application_1532614298307_0002",
+ "type": "YARN_APPLICATION",
+ "valid": true,
+ "configsJAXB": {},
+ "infoJAXB": {
+ "YARN_APPLICATION_VIEW_ACLS": ""
+ },
+ "relatesToEntitiesJAXB": {},
+ "isRelatedToEntitiesJAXB": {},
+ "isrelatedto": {},
+ "relatesto": {},
+ "createdtime": null,
+ "idprefix": 0
+ },
+ {
+ "identifier": {
+ "type": "YARN_APPLICATION_ATTEMPT",
+ "id": "appattempt_1532614298307_0002_000001"
+ },
+ "info": {
+ "YARN_APPLICATION_ATTEMPT_HOST": "test_user/10.171.19.36",
+ "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER": "container_1532614298307_0002_01_000001",
+ "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL": "N/A",
+ "YARN_APPLICATION_ATTEMPT_RPC_PORT": -1,
+ "YARN_APPLICATION_ATTEMPT_TRACKING_URL": "http://test_user:8088/proxy/application_1532614298307_0002/"
+ },
+ "configs": {},
+ "metrics": [],
+ "events": [
+ {
+ "id": "YARN_APPLICATION_ATTEMPT_REGISTERED",
+ "info": {},
+ "timestamp": 1532614551262,
+ "valid": true,
+ "infoJAXB": {}
+ }
+ ],
+ "id": "appattempt_1532614298307_0002_000001",
+ "type": "YARN_APPLICATION_ATTEMPT",
+ "valid": true,
+ "configsJAXB": {},
+ "infoJAXB": {
+ "YARN_APPLICATION_ATTEMPT_HOST": "test_user/10.171.19.36",
+ "YARN_APPLICATION_ATTEMPT_MASTER_CONTAINER": "container_1532614298307_0002_01_000001",
+ "YARN_APPLICATION_ATTEMPT_ORIGINAL_TRACKING_URL": "N/A",
+ "YARN_APPLICATION_ATTEMPT_RPC_PORT": -1,
+ "YARN_APPLICATION_ATTEMPT_TRACKING_URL": "http://test_user:8088/proxy/application_1532614298307_0002/"
+ },
+ "relatesToEntitiesJAXB": {},
+ "isRelatedToEntitiesJAXB": {},
+ "isrelatedto": {},
+ "relatesto": {},
+ "createdtime": 1532614551262,
+ "idprefix": 0
+ },
+ {
+ "identifier": {
+ "type": "YARN_CONTAINER",
+ "id": "container_1532614298307_0002_01_000001"
+ },
+ "info": {
+ "YARN_CONTAINER_ALLOCATED_PORT": 2032,
+ "YARN_CONTAINER_ALLOCATED_MEMORY": 1024,
+ "YARN_CONTAINER_ALLOCATED_PRIORITY": 0,
+ "YARN_CONTAINER_ALLOCATED_HOST": "test_user",
+ "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS": "http://test_user:8042",
+ "YARN_CONTAINER_ALLOCATED_VCORE": 1
+ },
+ "configs": {},
+ "metrics": [],
+ "events": [
+ {
+ "id": "YARN_RM_CONTAINER_CREATED",
+ "info": {},
+ "timestamp": 1532614543389,
+ "valid": true,
+ "infoJAXB": {}
+ }
+ ],
+ "id": "container_1532614298307_0002_01_000001",
+ "type": "YARN_CONTAINER",
+ "valid": true,
+ "configsJAXB": {},
+ "infoJAXB": {
+ "YARN_CONTAINER_ALLOCATED_PORT": 2032,
+ "YARN_CONTAINER_ALLOCATED_MEMORY": 1024,
+ "YARN_CONTAINER_ALLOCATED_PRIORITY": 0,
+ "YARN_CONTAINER_ALLOCATED_HOST": "test_user",
+ "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS": "http://test_user:8042",
+ "YARN_CONTAINER_ALLOCATED_VCORE": 1
+ },
+ "relatesToEntitiesJAXB": {},
+ "isRelatedToEntitiesJAXB": {},
+ "isrelatedto": {},
+ "relatesto": {},
+ "createdtime": 1532614543389,
+ "idprefix": 0
+ }
+]
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java
index 0da9258..024c61c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/metrics/PerNodeAggTimelineCollectorMetrics.java
@@ -62,7 +62,7 @@ final public class PerNodeAggTimelineCollectorMetrics {
synchronized (PerNodeAggTimelineCollectorMetrics.class) {
if (instance == null) {
instance =
- DefaultMetricsSystem.initialize("TimelineService").register(
+ DefaultMetricsSystem.instance().register(
METRICS_INFO.name(), METRICS_INFO.description(),
new PerNodeAggTimelineCollectorMetrics());
isInitialized.set(true);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index 2f51023..0855105 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -46,5 +46,6 @@
<module>hadoop-yarn-server-timelineservice-hbase</module>
<module>hadoop-yarn-server-timelineservice-hbase-tests</module>
<module>hadoop-yarn-server-router</module>
+ <module>hadoop-yarn-server-timelineservice-documentstore</module>
</modules>
</project>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org