You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/07/07 03:15:07 UTC

hadoop git commit: YARN-3051. Created storage oriented reader interface for fetching raw entity data and made the filesystem based implementation. Contributed by Varun Saxena.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 683755269 -> 499ce52c7


YARN-3051. Created storage oriented reader interface for fetching raw entity data and made the filesystem based implementation. Contributed by Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/499ce52c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/499ce52c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/499ce52c

Branch: refs/heads/YARN-2928
Commit: 499ce52c7b645ec0b1cc8ac62dc9a3127b987a20
Parents: 6837552
Author: Zhijie Shen <zj...@apache.org>
Authored: Mon Jul 6 18:11:27 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jul 6 18:11:27 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../records/timelineservice/TimelineEntity.java |   5 +
 .../storage/FileSystemTimelineReaderImpl.java   | 490 ++++++++++++++++
 .../timelineservice/storage/TimelineReader.java | 162 ++++++
 .../TestFileSystemTimelineReaderImpl.java       | 556 +++++++++++++++++++
 5 files changed, 1216 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 91358c1..1586e7f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -73,6 +73,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3801. [JDK-8] Exclude jdk.tools from hbase-client and
     hbase-testing-util (Tsuyoshi Ozawa via sjlee)
 
+    YARN-3051. Created storage oriented reader interface for fetching raw entity
+    data and made the filesystem based implementation. (Varun Saxena via zjshen)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index a641f32..60fba85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.timelineservice;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.annotate.JsonSetter;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -335,6 +336,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("isrelatedto")
   public void setIsRelatedToEntities(
       Map<String, Set<String>> isRelatedToEntities) {
     if (real == null) {
@@ -423,6 +425,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("relatesto")
   public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
     if (real == null) {
       this.relatesToEntities =
@@ -441,6 +444,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("createdtime")
   public void setCreatedTime(long createdTime) {
     if (real == null) {
       this.createdTime = createdTime;
@@ -458,6 +462,7 @@ public class TimelineEntity {
     }
   }
 
+  @JsonSetter("modifiedtime")
   public void setModifiedTime(long modifiedTime) {
     if (real == null) {
       this.modifiedTime = modifiedTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
new file mode 100644
index 0000000..f9f1d1d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ *  File System based implementation for TimelineReader.
+ */
+public class FileSystemTimelineReaderImpl extends AbstractService
+    implements TimelineReader {
+
+  private static final Log LOG =
+      LogFactory.getLog(FileSystemTimelineReaderImpl.class);
+
+  private String rootPath;
+  private static final String ENTITIES_DIR = "entities";
+
+  /** Default extension for output files. */
+  private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+  @VisibleForTesting
+  /** Default extension for output files. */
+  static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
+
+  @VisibleForTesting
+  /** Config param for timeline service file system storage root. */
+  static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+  @VisibleForTesting
+  /** Default value for storage location on local disk. */
+  static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+      "/tmp/timeline_service_data";
+
+  private final CSVFormat csvFormat =
+      CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+
+  public FileSystemTimelineReaderImpl() {
+    super(FileSystemTimelineReaderImpl.class.getName());
+  }
+
+  @VisibleForTesting
+  String getRootPath() {
+    return rootPath;
+  }
+
+  private static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+  }
+
+  /**
+   * Deserialize a POJO object from a JSON string.
+   * @param clazz
+   *      class to be desirialized
+   *
+   * @param jsonString
+   *    json string to deserialize
+   * @return TimelineEntity object
+   * @throws IOException
+   * @throws JsonMappingException
+   * @throws JsonGenerationException
+   */
+  public static <T> T getTimelineRecordFromJSON(
+      String jsonString, Class<T> clazz)
+      throws JsonGenerationException, JsonMappingException, IOException {
+    return mapper.readValue(jsonString, clazz);
+  }
+
+  private static void fillFields(TimelineEntity finalEntity,
+      TimelineEntity real, EnumSet<Field> fields) {
+    if (fields.contains(Field.ALL)) {
+      finalEntity.setConfigs(real.getConfigs());
+      finalEntity.setMetrics(real.getMetrics());
+      finalEntity.setInfo(real.getInfo());
+      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+      finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+      finalEntity.setEvents(real.getEvents());
+      return;
+    }
+    for (Field field : fields) {
+      switch(field) {
+      case CONFIGS:
+        finalEntity.setConfigs(real.getConfigs());
+        break;
+      case METRICS:
+        finalEntity.setMetrics(real.getMetrics());
+        break;
+      case INFO:
+        finalEntity.setInfo(real.getInfo());
+        break;
+      case IS_RELATED_TO:
+        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+        break;
+      case RELATES_TO:
+        finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+        break;
+      case EVENTS:
+        finalEntity.setEvents(real.getEvents());
+        break;
+      default:
+        continue;
+      }
+    }
+  }
+
+  private static boolean matchFilter(Object infoValue, Object filterValue) {
+    return infoValue.equals(filterValue);
+  }
+
+  private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
+      Map<String, ? extends Object> filters) {
+    if (entityInfo == null || entityInfo.isEmpty()) {
+      return false;
+    }
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object infoValue = entityInfo.get(filter.getKey());
+      if (infoValue == null) {
+        return false;
+      }
+      if (!matchFilter(infoValue, filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private String getFlowRunPath(String userId, String clusterId, String flowId,
+      Long flowRunId, String appId)
+      throws IOException {
+    if (userId != null && flowId != null && flowRunId != null) {
+      return userId + "/" + flowId + "/" + flowRunId;
+    }
+    if (clusterId == null || appId == null) {
+      throw new IOException("Unable to get flow info");
+    }
+    String appFlowMappingFile = rootPath + "/" +  ENTITIES_DIR + "/" +
+        clusterId + "/" + APP_FLOW_MAPPING_FILE;
+    try (BufferedReader reader =
+        new BufferedReader(new InputStreamReader(
+            new FileInputStream(
+                appFlowMappingFile), Charset.forName("UTF-8")));
+        CSVParser parser = new CSVParser(reader, csvFormat)) {
+      for (CSVRecord record : parser.getRecords()) {
+        if (record.size() < 4) {
+          continue;
+        }
+        String applicationId = record.get("APP");
+        if (applicationId != null && !applicationId.trim().isEmpty() &&
+            !applicationId.trim().equals(appId)) {
+          continue;
+        }
+        return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
+            record.get(3).trim();
+      }
+      parser.close();
+    }
+    throw new IOException("Unable to get flow info");
+  }
+
+  private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> tempMetrics = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      tempMetrics.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!tempMetrics.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> tempEvents = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      tempEvents.add(event.getId());
+    }
+
+    for (String eventFilter : eventFilters) {
+      if (!tempEvents.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
+      EnumSet<Field> fieldsToRetrieve) {
+    TimelineEntity entityToBeReturned = new TimelineEntity();
+    entityToBeReturned.setIdentifier(entity.getIdentifier());
+    entityToBeReturned.setCreatedTime(entity.getCreatedTime());
+    entityToBeReturned.setModifiedTime(entity.getModifiedTime());
+    if (fieldsToRetrieve != null) {
+      fillFields(entityToBeReturned, entity, fieldsToRetrieve);
+    }
+    return entityToBeReturned;
+  }
+
+  private static boolean isTimeInRange(Long time, Long timeBegin,
+      Long timeEnd) {
+    return (time >= timeBegin) && (time <= timeEnd);
+  }
+
+  private static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relations) {
+    for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  private static void mergeEntities(TimelineEntity entity1,
+      TimelineEntity entity2) {
+    // Ideally created time wont change except in the case of issue from client.
+    if (entity2.getCreatedTime() > 0) {
+      entity1.setCreatedTime(entity2.getCreatedTime());
+    }
+    if (entity2.getModifiedTime() > 0) {
+      entity1.setModifiedTime(entity2.getModifiedTime());
+    }
+    for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
+      entity1.addConfig(configEntry.getKey(), configEntry.getValue());
+    }
+    for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) {
+      entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
+    }
+    for (Entry<String, Set<String>> isRelatedToEntry :
+        entity2.getIsRelatedToEntities().entrySet()) {
+      String type = isRelatedToEntry.getKey();
+      for (String entityId : isRelatedToEntry.getValue()) {
+        entity1.addIsRelatedToEntity(type, entityId);
+      }
+    }
+    for (Entry<String, Set<String>> relatesToEntry :
+        entity2.getRelatesToEntities().entrySet()) {
+      String type = relatesToEntry.getKey();
+      for (String entityId : relatesToEntry.getValue()) {
+        entity1.addRelatesToEntity(type, entityId);
+      }
+    }
+    for (TimelineEvent event : entity2.getEvents()) {
+      entity1.addEvent(event);
+    }
+    for (TimelineMetric metric2 : entity2.getMetrics()) {
+      boolean found = false;
+      for (TimelineMetric metric1 : entity1.getMetrics()) {
+        if (metric1.getId().equals(metric2.getId())) {
+          metric1.addValues(metric2.getValues());
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        entity1.addMetric(metric2);
+      }
+    }
+  }
+
+  private static TimelineEntity readEntityFromFile(BufferedReader reader)
+      throws IOException {
+    TimelineEntity entity =
+        getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
+    String entityStr = "";
+    while ((entityStr = reader.readLine()) != null) {
+      if (entityStr.trim().isEmpty()) {
+        continue;
+      }
+      TimelineEntity anotherEntity =
+          getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
+      if (!entity.getId().equals(anotherEntity.getId()) ||
+          !entity.getType().equals(anotherEntity.getType())) {
+        continue;
+      }
+      mergeEntities(entity, anotherEntity);
+    }
+    return entity;
+  }
+
+  private Set<TimelineEntity> getEntities(File dir, String entityType,
+      Long limit, Long createdTimeBegin,
+      Long createdTimeEnd, Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    if (limit == null || limit <= 0) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (createdTimeBegin == null || createdTimeBegin <= 0) {
+      createdTimeBegin = 0L;
+    }
+    if (createdTimeEnd == null || createdTimeEnd <= 0) {
+      createdTimeEnd = Long.MAX_VALUE;
+    }
+    if (modifiedTimeBegin == null || modifiedTimeBegin <= 0) {
+      modifiedTimeBegin = 0L;
+    }
+    if (modifiedTimeEnd == null || modifiedTimeEnd <= 0) {
+      modifiedTimeEnd = Long.MAX_VALUE;
+    }
+
+    // First sort the selected entities based on created/start time.
+    Map<Long, Set<TimelineEntity>> sortedEntities =
+        new TreeMap<>(
+          new Comparator<Long>() {
+            @Override
+            public int compare(Long l1, Long l2) {
+              return l2.compareTo(l1);
+            }
+          }
+        );
+    for (File entityFile : dir.listFiles()) {
+      if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
+        continue;
+      }
+      try (BufferedReader reader =
+          new BufferedReader(
+              new InputStreamReader(
+                  new FileInputStream(
+                      entityFile), Charset.forName("UTF-8")))) {
+        TimelineEntity entity = readEntityFromFile(reader);
+        if (!entity.getType().equals(entityType)) {
+          continue;
+        }
+        if (!isTimeInRange(entity.getCreatedTime(), createdTimeBegin,
+            createdTimeEnd)) {
+          continue;
+        }
+        if (!isTimeInRange(entity.getModifiedTime(), modifiedTimeBegin,
+            modifiedTimeEnd)) {
+          continue;
+        }
+        if (relatesTo != null && !relatesTo.isEmpty() &&
+            !matchRelations(entity.getRelatesToEntities(), relatesTo)) {
+          continue;
+        }
+        if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
+            !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
+          continue;
+        }
+        if (infoFilters != null && !infoFilters.isEmpty() &&
+            !matchFilters(entity.getInfo(), infoFilters)) {
+          continue;
+        }
+        if (configFilters != null && !configFilters.isEmpty() &&
+            !matchFilters(entity.getConfigs(), configFilters)) {
+          continue;
+        }
+        if (metricFilters != null && !metricFilters.isEmpty() &&
+            !matchMetricFilters(entity.getMetrics(), metricFilters)) {
+          continue;
+        }
+        if (eventFilters != null && !eventFilters.isEmpty() &&
+            !matchEventFilters(entity.getEvents(), eventFilters)) {
+          continue;
+        }
+        TimelineEntity entityToBeReturned =
+            createEntityToBeReturned(entity, fieldsToRetrieve);
+        Set<TimelineEntity> entitiesCreatedAtSameTime =
+            sortedEntities.get(entityToBeReturned.getCreatedTime());
+        if (entitiesCreatedAtSameTime == null) {
+          entitiesCreatedAtSameTime = new HashSet<TimelineEntity>();
+        }
+        entitiesCreatedAtSameTime.add(entityToBeReturned);
+        sortedEntities.put(
+            entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime);
+      }
+    }
+
+    Set<TimelineEntity> entities = new HashSet<TimelineEntity>();
+    long entitiesAdded = 0;
+    for (Set<TimelineEntity> entitySet : sortedEntities.values()) {
+      for (TimelineEntity entity : entitySet) {
+        entities.add(entity);
+        ++entitiesAdded;
+        if (entitiesAdded >= limit) {
+          return entities;
+        }
+      }
+    }
+    return entities;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public TimelineEntity getEntity(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException {
+    String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
+        flowRunId, appId);
+    File dir = new File(new File(rootPath, ENTITIES_DIR),
+        clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
+    File entityFile =
+        new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
+    try (BufferedReader reader =
+        new BufferedReader(new InputStreamReader(
+            new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+      TimelineEntity entity = readEntityFromFile(reader);
+      return createEntityToBeReturned(entity, fieldsToRetrieve);
+    }
+  }
+
+  @Override
+  public Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String> metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException {
+    String flowRunPath =
+        getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
+    File dir =
+        new File(new File(rootPath, ENTITIES_DIR),
+            clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
+    return getEntities(dir, entityType, limit,
+        createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+        relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+        eventFilters, fieldsToRetrieve);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
new file mode 100644
index 0000000..e4e305e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+
+/** ATSv2 reader interface. */
+@Private
+@Unstable
+public interface TimelineReader extends Service {
+
+  /**
+   * Default limit for {@link #getEntities}.
+   */
+  long DEFAULT_LIMIT = 100;
+
+  /**
+   * Possible fields to retrieve for {@link #getEntities} and
+   * {@link #getEntity}.
+   */
+  public enum Field {
+    ALL,
+    EVENTS,
+    INFO,
+    METRICS,
+    CONFIGS,
+    RELATES_TO,
+    IS_RELATED_TO
+  }
+
+  /**
+   * <p>The API to fetch the single entity given the entity identifier in the
+   * scope of the given context.</p>
+   *
+   * @param userId
+   *    Context user Id(optional).
+   * @param clusterId
+   *    Context cluster Id(mandatory).
+   * @param flowId
+   *    Context flow Id (optional).
+   * @param flowRunId
+   *    Context flow run Id (optional).
+   * @param appId
+   *    Context app Id (mandatory)
+   * @param entityType
+   *    Entity type (mandatory)
+   * @param entityId
+   *    Entity Id (mandatory)
+   * @param fieldsToRetrieve
+   *    Specifies which fields of the entity object to retrieve(optional), see
+   *    {@link Field}. If null, retrieves 4 fields namely entity id,
+   *    entity type, entity created time and entity modified time. All
+   *    entities will be returned if {@link Field#ALL} is specified.
+   * @return a {@link TimelineEntity} instance or null. The entity will
+   *    contain the metadata plus the given fields to retrieve.
+   * @throws IOException
+   */
+  TimelineEntity getEntity(String userId, String clusterId, String flowId,
+      Long flowRunId, String appId, String entityType, String entityId,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+  /**
+   * <p>The API to search for a set of entities of the given the entity type in
+   * the scope of the given context which matches the given predicates. The
+   * predicates include the created/modified time window, limit to number of
+   * entities to be returned, and the entities can be filtered by checking
+   * whether they contain the given info/configs entries in the form of
+   * key/value pairs, given metrics in the form of metricsIds and its relation
+   * with metric values given events in the form of the Ids, and whether they
+   * relate to/are related to other entities. For those parameters which have
+   * multiple entries, the qualified entity needs to meet all or them.</p>
+   *
+   * @param userId
+   *    Context user Id(optional).
+   * @param clusterId
+   *    Context cluster Id(mandatory).
+   * @param flowId
+   *    Context flow Id (optional).
+   * @param flowRunId
+   *    Context flow run Id (optional).
+   * @param appId
+   *    Context app Id (mandatory)
+   * @param entityType
+   *    Entity type (mandatory)
+   * @param limit
+   *    A limit on the number of entities to return (optional). If null or <=0,
+   *    defaults to {@link #DEFAULT_LIMIT}.
+   * @param createdTimeBegin
+   *    Matched entities should not be created before this timestamp (optional).
+   *    If null or <=0, defaults to 0.
+   * @param createdTimeEnd
+   *    Matched entities should not be created after this timestamp (optional).
+   *    If null or <=0, defaults to {@link Long#MAX_VALUE}.
+   * @param modifiedTimeBegin
+   *    Matched entities should not be modified before this timestamp
+   *    (optional). If null or <=0, defaults to 0.
+   * @param modifiedTimeEnd
+   *    Matched entities should not be modified after this timestamp (optional).
+   *    If null or <=0, defaults to {@link Long#MAX_VALUE}.
+   * @param relatesTo
+   *    Matched entities should relate to given entities (optional).
+   * @param isRelatedTo
+   *    Matched entities should be related to given entities (optional).
+   * @param infoFilters
+   *    Matched entities should have exact matches to the given info represented
+   *    as key-value pairs (optional). If null or empty, the filter is not
+   *    applied.
+   * @param configFilters
+   *    Matched entities should have exact matches to the given configs
+   *    represented as key-value pairs (optional). If null or empty, the filter
+   *    is not applied.
+   * @param metricFilters
+   *    Matched entities should contain the given metrics (optional). If null
+   *    or empty, the filter is not applied.
+   * @param eventFilters
+   *    Matched entities should contain the given events (optional). If null
+   *    or empty, the filter is not applied.
+   * @param fieldsToRetrieve
+   *    Specifies which fields of the entity object to retrieve(optional), see
+   *    {@link Field}. If null, retrieves 4 fields namely entity id,
+   *    entity type, entity created time and entity modified time. All
+   *    entities will be returned if {@link Field#ALL} is specified.
+   * @return A set of {@link TimelineEntity} instances of the given entity type
+   *    in the given context scope which matches the given predicates
+   *    ordered by created time, descending. Each entity will only contain the
+   *    metadata(id, type, created and modified times) plus the given fields to
+   *    retrieve.
+   * @throws IOException
+   */
+  Set<TimelineEntity> getEntities(String userId, String clusterId,
+      String flowId, Long flowRunId, String appId, String entityType,
+      Long limit, Long createdTimeBegin, Long createdTimeEnd,
+      Long modifiedTimeBegin, Long modifiedTimeEnd,
+      Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+      Map<String, Object> infoFilters, Map<String, String> configFilters,
+      Set<String>  metricFilters, Set<String> eventFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
new file mode 100644
index 0000000..4e23e49
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileSystemTimelineReaderImpl {
+
+  private static final String rootDir =
+      FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+  FileSystemTimelineReaderImpl reader;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    loadEntityData();
+    // Create app flow mapping file.
+    CSVFormat format =
+        CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+    String appFlowMappingFile = rootDir + "/entities/cluster1/" +
+        FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(
+            new FileWriter(appFlowMappingFile, true)));
+        CSVPrinter printer = new CSVPrinter(out, format)){
+      printer.printRecord("app1", "user1", "flow1", 1);
+      printer.printRecord("app2","user1","flow1,flow",1);
+      printer.close();
+    }
+    (new File(rootDir)).deleteOnExit();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    FileUtils.deleteDirectory(new File(rootDir));
+  }
+
+  @Before
+  public void init() throws Exception {
+    reader = new FileSystemTimelineReaderImpl();
+    Configuration conf = new YarnConfiguration();
+    conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        rootDir);
+    reader.init(conf);
+  }
+
+  private static void writeEntityFile(TimelineEntity entity, File dir)
+      throws Exception {
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Could not create directories for " + dir);
+      }
+    }
+    String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
+    try (PrintWriter out =
+        new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
+      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      out.write("\n");
+      out.close();
+    }
+  }
+
+  private static void loadEntityData() throws Exception {
+    File appDir = new File(rootDir +
+        "/entities/cluster1/user1/flow1/1/app1/app/");
+    TimelineEntity entity11 = new TimelineEntity();
+    entity11.setId("id_1");
+    entity11.setType("app");
+    entity11.setCreatedTime(1425016502000L);
+    entity11.setModifiedTime(1425016502050L);
+    Map<String, Object> info1 = new HashMap<String, Object>();
+    info1.put("info1", "val1");
+    entity11.addInfo(info1);
+    TimelineEvent event = new TimelineEvent();
+    event.setId("event_1");
+    event.setTimestamp(1425016502003L);
+    entity11.addEvent(event);
+    Set<TimelineMetric> metrics = new HashSet<TimelineMetric>();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setId("metric1");
+    metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric1.addValue(1425016502006L, 113.2F);
+    metrics.add(metric1);
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setId("metric2");
+    metric2.setType(TimelineMetric.Type.TIME_SERIES);
+    metric2.addValue(1425016502016L, 34);
+    metrics.add(metric2);
+    entity11.setMetrics(metrics);
+    Map<String,String> configs = new HashMap<String, String>();
+    configs.put("config_1", "123");
+    entity11.setConfigs(configs);
+    entity11.addRelatesToEntity("flow", "flow1");
+    entity11.addIsRelatedToEntity("type1", "tid1_1");
+    writeEntityFile(entity11, appDir);
+    TimelineEntity entity12 = new TimelineEntity();
+    entity12.setId("id_1");
+    entity12.setType("app");
+    entity12.setModifiedTime(1425016503000L);
+    configs.clear();
+    configs.put("config_2", "23");
+    configs.put("config_3", "abc");
+    entity12.addConfigs(configs);
+    metrics.clear();
+    TimelineMetric metric12 = new TimelineMetric();
+    metric12.setId("metric2");
+    metric12.setType(TimelineMetric.Type.TIME_SERIES);
+    metric12.addValue(1425016502032L, 48);
+    metric12.addValue(1425016502054L, 51);
+    metrics.add(metric12);
+    TimelineMetric metric3 = new TimelineMetric();
+    metric3.setId("metric3");
+    metric3.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric3.addValue(1425016502060L, 23L);
+    metrics.add(metric3);
+    entity12.setMetrics(metrics);
+    entity12.addIsRelatedToEntity("type1", "tid1_2");
+    entity12.addIsRelatedToEntity("type2", "tid2_1`");
+    TimelineEvent event15 = new TimelineEvent();
+    event15.setId("event_5");
+    event15.setTimestamp(1425016502017L);
+    entity12.addEvent(event15);
+    writeEntityFile(entity12, appDir);
+
+    TimelineEntity entity2 = new TimelineEntity();
+    entity2.setId("id_2");
+    entity2.setType("app");
+    entity2.setCreatedTime(1425016501050L);
+    entity2.setModifiedTime(1425016502010L);
+    Map<String, Object> info2 = new HashMap<String, Object>();
+    info1.put("info2", 4);
+    entity2.addInfo(info2);
+    Map<String,String> configs2 = new HashMap<String, String>();
+    configs2.put("config_1", "123");
+    configs2.put("config_3", "def");
+    entity2.setConfigs(configs2);
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId("event_2");
+    event2.setTimestamp(1425016501003L);
+    entity2.addEvent(event2);
+    Set<TimelineMetric> metrics2 = new HashSet<TimelineMetric>();
+    TimelineMetric metric21 = new TimelineMetric();
+    metric21.setId("metric1");
+    metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric21.addValue(1425016501006L, 123.2F);
+    metrics2.add(metric21);
+    TimelineMetric metric22 = new TimelineMetric();
+    metric22.setId("metric2");
+    metric22.setType(TimelineMetric.Type.TIME_SERIES);
+    metric22.addValue(1425016501056L, 31);
+    metric22.addValue(1425016501084L, 70);
+    metrics2.add(metric22);
+    TimelineMetric metric23 = new TimelineMetric();
+    metric23.setId("metric3");
+    metric23.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric23.addValue(1425016502060L, 23L);
+    metrics2.add(metric23);
+    entity2.setMetrics(metrics2);
+    entity2.addRelatesToEntity("flow", "flow2");
+    writeEntityFile(entity2, appDir);
+
+    TimelineEntity entity3 = new TimelineEntity();
+    entity3.setId("id_3");
+    entity3.setType("app");
+    entity3.setCreatedTime(1425016501050L);
+    entity3.setModifiedTime(1425016502010L);
+    Map<String, Object> info3 = new HashMap<String, Object>();
+    info3.put("info2", 3.5);
+    entity3.addInfo(info3);
+    Map<String,String> configs3 = new HashMap<String, String>();
+    configs3.put("config_1", "123");
+    configs3.put("config_3", "abc");
+    entity3.setConfigs(configs3);
+    TimelineEvent event3 = new TimelineEvent();
+    event3.setId("event_2");
+    event3.setTimestamp(1425016501003L);
+    entity3.addEvent(event3);
+    TimelineEvent event4 = new TimelineEvent();
+    event4.setId("event_4");
+    event4.setTimestamp(1425016502006L);
+    entity3.addEvent(event4);
+    Set<TimelineMetric> metrics3 = new HashSet<TimelineMetric>();
+    TimelineMetric metric31 = new TimelineMetric();
+    metric31.setId("metric1");
+    metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
+    metric31.addValue(1425016501006L, 124.8F);
+    metrics3.add(metric31);
+    TimelineMetric metric32 = new TimelineMetric();
+    metric32.setId("metric2");
+    metric32.setType(TimelineMetric.Type.TIME_SERIES);
+    metric32.addValue(1425016501056L, 31);
+    metric32.addValue(1425016501084L, 74);
+    metrics3.add(metric32);
+    entity3.setMetrics(metrics3);
+    entity3.addIsRelatedToEntity("type1", "tid1_2");
+    writeEntityFile(entity3, appDir);
+
+    TimelineEntity entity4 = new TimelineEntity();
+    entity4.setId("id_4");
+    entity4.setType("app");
+    entity4.setCreatedTime(1425016502050L);
+    entity4.setModifiedTime(1425016503010L);
+    TimelineEvent event44 = new TimelineEvent();
+    event44.setId("event_4");
+    event44.setTimestamp(1425016502003L);
+    entity4.addEvent(event44);
+    writeEntityFile(entity4, appDir);
+
+    File appDir2 = new File(rootDir +
+            "/entities/cluster1/user1/flow1,flow/1/app2/app/");
+    TimelineEntity entity5 = new TimelineEntity();
+    entity5.setId("id_5");
+    entity5.setType("app");
+    entity5.setCreatedTime(1425016502050L);
+    entity5.setModifiedTime(1425016503010L);
+    writeEntityFile(entity5, appDir2);
+  }
+
+  public TimelineReader getTimelineReader() {
+    return reader;
+  }
+
+  @Test
+  public void testGetEntityDefaultView() throws Exception {
+    // If no fields are specified, entity is returned with default view i.e.
+    // only the id, created and modified time
+    TimelineEntity result =
+        reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
+            "app", "id_1", null);
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+
+  @Test
+  public void testGetEntityByClusterAndApp() throws Exception {
+    // Cluster and AppId should be enough to get an entity.
+    TimelineEntity result =
+        reader.getEntity(null, "cluster1", null, null, "app1",
+            "app", "id_1", null);
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(0, result.getConfigs().size());
+    Assert.assertEquals(0, result.getMetrics().size());
+  }
+
+  /** This test checks whether we can handle commas in app flow mapping csv */
+  @Test
+  public void testAppFlowMappingCsv() throws Exception {
+    // Test getting an entity by cluster and app where flow entry
+    // in app flow mapping csv has commas.
+    TimelineEntity result =
+        reader.getEntity(null, "cluster1", null, null, "app2",
+            "app", "id_5", null);
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_5")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502050L, result.getCreatedTime());
+    Assert.assertEquals(1425016503010L, result.getModifiedTime());
+  }
+
+  @Test
+  public void testGetEntityCustomFields() throws Exception {
+    // Specified fields in addition to default view will be returned.
+    TimelineEntity result =
+        reader.getEntity("user1", "cluster1", "flow1", 1L,
+            "app1", "app", "id_1",
+            EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    Assert.assertEquals(1, result.getInfo().size());
+    // No events will be returned
+    Assert.assertEquals(0, result.getEvents().size());
+  }
+
+  @Test
+  public void testGetEntityAllFields() throws Exception {
+    // All fields of TimelineEntity will be returned.
+    TimelineEntity result =
+        reader.getEntity("user1", "cluster1", "flow1", 1L,
+            "app1", "app", "id_1", EnumSet.of(Field.ALL));
+    Assert.assertEquals(
+        (new TimelineEntity.Identifier("app", "id_1")).toString(),
+        result.getIdentifier().toString());
+    Assert.assertEquals(1425016502000L, result.getCreatedTime());
+    Assert.assertEquals(1425016503000L, result.getModifiedTime());
+    Assert.assertEquals(3, result.getConfigs().size());
+    Assert.assertEquals(3, result.getMetrics().size());
+    // All fields including events will be returned.
+    Assert.assertEquals(2, result.getEvents().size());
+  }
+
+  @Test
+  public void testGetAllEntities() throws Exception {
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, null, null,
+            null, null);
+    // All 3 entities will be returned
+    Assert.assertEquals(4, result.size());
+  }
+
+  @Test
+  public void testGetEntitiesWithLimit() throws Exception {
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            2L, null, null, null, null, null, null, null, null, null,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    // Needs to be rewritten once hashcode and equals for
+    // TimelineEntity is implemented
+    // Entities with id_1 and id_4 should be returned,
+    // based on created time, descending.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+        Assert.fail("Entity not sorted by created time");
+      }
+    }
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            3L, null, null, null, null, null, null, null, null, null,
+                null, null);
+     // Even though 2 entities out of 4 have same created time, one entity
+     // is left out due to limit
+     Assert.assertEquals(3, result.size());
+  }
+
+  @Test
+  public void testGetEntitiesByTimeWindows() throws Exception {
+    // Get entities based on created time start and end time range.
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, 1425016502030L, 1425016502060L, null, null, null, null, null,
+            null, null, null, null);
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_4 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities if only created time end is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, 1425016502010L, null, null, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(3, result.size());
+    for (TimelineEntity entity : result) {
+      if (entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities if only created time start is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, 1425016502010L, null, null, null, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on created time range");
+      }
+    }
+
+    // Get entities based on modified time start and end time range.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, 1425016502090L, 1425016503020L, null, null, null,
+            null, null, null, null);
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_4 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on modified time range");
+      }
+    }
+
+    // Get entities if only modified time end is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, 1425016502090L, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on modified time range");
+      }
+    }
+
+    // Get entities if only modified time start is specified.
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, 1425016503005L, null, null, null, null, null,
+            null, null, null);
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_4")) {
+        Assert.fail("Incorrect filtering based on modified time range");
+      }
+    }
+  }
+
+  @Test
+  public void testGetFilteredEntities() throws Exception {
+    // Get entities based on info filters.
+    Map<String, Object> infoFilters = new HashMap<String, Object>();
+    infoFilters.put("info2", 3.5);
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, infoFilters, null, null,
+            null, null);
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on info filters");
+      }
+    }
+
+    // Get entities based on config filters.
+    Map<String, String> configFilters = new HashMap<String, String>();
+    configFilters.put("config_1", "123");
+    configFilters.put("config_3", "abc");
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, configFilters, null,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on config filters");
+      }
+    }
+
+    // Get entities based on event filters.
+    Set<String> eventFilters = new HashSet<String>();
+    eventFilters.add("event_2");
+    eventFilters.add("event_4");
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, null, null,
+            eventFilters, null);
+    Assert.assertEquals(1, result.size());
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on event filters");
+      }
+    }
+
+    // Get entities based on metric filters.
+    Set<String> metricFilters = new HashSet<String>();
+    metricFilters.add("metric3");
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, null, null, null, metricFilters,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_2 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+        Assert.fail("Incorrect filtering based on metric filters");
+      }
+    }
+  }
+
+  @Test
+  public void testGetEntitiesByRelations() throws Exception {
+    // Get entities based on relatesTo.
+    Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+    Set<String> relatesToIds = new HashSet<String>();
+    relatesToIds.add("flow1");
+    relatesTo.put("flow", relatesToIds);
+    Set<TimelineEntity> result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, relatesTo, null, null, null, null,
+            null, null);
+    Assert.assertEquals(1, result.size());
+    // Only one entity with ID id_1 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1")) {
+        Assert.fail("Incorrect filtering based on relatesTo");
+      }
+    }
+
+    // Get entities based on isRelatedTo.
+    Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+    Set<String> isRelatedToIds = new HashSet<String>();
+    isRelatedToIds.add("tid1_2");
+    isRelatedTo.put("type1", isRelatedToIds);
+    result =
+        reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+            null, null, null, null, null, null, isRelatedTo, null, null, null,
+            null, null);
+    Assert.assertEquals(2, result.size());
+    // Two entities with IDs' id_1 and id_3 should be returned.
+    for (TimelineEntity entity : result) {
+      if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+        Assert.fail("Incorrect filtering based on isRelatedTo");
+      }
+    }
+  }
+}