You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/07/07 03:15:07 UTC
hadoop git commit: YARN-3051. Created storage oriented reader
interface for fetching raw entity data and made the filesystem based
implementation. Contributed by Varun Saxena.
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 683755269 -> 499ce52c7
YARN-3051. Created storage oriented reader interface for fetching raw entity data and made the filesystem based implementation. Contributed by Varun Saxena.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/499ce52c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/499ce52c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/499ce52c
Branch: refs/heads/YARN-2928
Commit: 499ce52c7b645ec0b1cc8ac62dc9a3127b987a20
Parents: 6837552
Author: Zhijie Shen <zj...@apache.org>
Authored: Mon Jul 6 18:11:27 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Jul 6 18:11:27 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../records/timelineservice/TimelineEntity.java | 5 +
.../storage/FileSystemTimelineReaderImpl.java | 490 ++++++++++++++++
.../timelineservice/storage/TimelineReader.java | 162 ++++++
.../TestFileSystemTimelineReaderImpl.java | 556 +++++++++++++++++++
5 files changed, 1216 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 91358c1..1586e7f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -73,6 +73,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3801. [JDK-8] Exclude jdk.tools from hbase-client and
hbase-testing-util (Tsuyoshi Ozawa via sjlee)
+ YARN-3051. Created storage oriented reader interface for fetching raw entity
+ data and made the filesystem based implementation. (Varun Saxena via zjshen)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index a641f32..60fba85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.annotate.JsonSetter;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -335,6 +336,7 @@ public class TimelineEntity {
}
}
+ @JsonSetter("isrelatedto")
public void setIsRelatedToEntities(
Map<String, Set<String>> isRelatedToEntities) {
if (real == null) {
@@ -423,6 +425,7 @@ public class TimelineEntity {
}
}
+ @JsonSetter("relatesto")
public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
if (real == null) {
this.relatesToEntities =
@@ -441,6 +444,7 @@ public class TimelineEntity {
}
}
+ @JsonSetter("createdtime")
public void setCreatedTime(long createdTime) {
if (real == null) {
this.createdTime = createdTime;
@@ -458,6 +462,7 @@ public class TimelineEntity {
}
}
+ @JsonSetter("modifiedtime")
public void setModifiedTime(long modifiedTime) {
if (real == null) {
this.modifiedTime = modifiedTime;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
new file mode 100644
index 0000000..f9f1d1d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * File System based implementation for TimelineReader.
+ */
+public class FileSystemTimelineReaderImpl extends AbstractService
+ implements TimelineReader {
+
+ private static final Log LOG =
+ LogFactory.getLog(FileSystemTimelineReaderImpl.class);
+
+ private String rootPath;
+ private static final String ENTITIES_DIR = "entities";
+
+ /** Default extension for output files. */
+ private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+ @VisibleForTesting
+ /** Default extension for output files. */
+ static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
+
+ @VisibleForTesting
+ /** Config param for timeline service file system storage root. */
+ static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+ @VisibleForTesting
+ /** Default value for storage location on local disk. */
+ static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
+ "/tmp/timeline_service_data";
+
+ private final CSVFormat csvFormat =
+ CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+
+ public FileSystemTimelineReaderImpl() {
+ super(FileSystemTimelineReaderImpl.class.getName());
+ }
+
+ @VisibleForTesting
+ String getRootPath() {
+ return rootPath;
+ }
+
+ private static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+ }
+
+ /**
+ * Deserialize a POJO object from a JSON string.
+ * @param clazz
+ * class to be desirialized
+ *
+ * @param jsonString
+ * json string to deserialize
+ * @return TimelineEntity object
+ * @throws IOException
+ * @throws JsonMappingException
+ * @throws JsonGenerationException
+ */
+ public static <T> T getTimelineRecordFromJSON(
+ String jsonString, Class<T> clazz)
+ throws JsonGenerationException, JsonMappingException, IOException {
+ return mapper.readValue(jsonString, clazz);
+ }
+
+ private static void fillFields(TimelineEntity finalEntity,
+ TimelineEntity real, EnumSet<Field> fields) {
+ if (fields.contains(Field.ALL)) {
+ finalEntity.setConfigs(real.getConfigs());
+ finalEntity.setMetrics(real.getMetrics());
+ finalEntity.setInfo(real.getInfo());
+ finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+ finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+ finalEntity.setEvents(real.getEvents());
+ return;
+ }
+ for (Field field : fields) {
+ switch(field) {
+ case CONFIGS:
+ finalEntity.setConfigs(real.getConfigs());
+ break;
+ case METRICS:
+ finalEntity.setMetrics(real.getMetrics());
+ break;
+ case INFO:
+ finalEntity.setInfo(real.getInfo());
+ break;
+ case IS_RELATED_TO:
+ finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+ break;
+ case RELATES_TO:
+ finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+ break;
+ case EVENTS:
+ finalEntity.setEvents(real.getEvents());
+ break;
+ default:
+ continue;
+ }
+ }
+ }
+
+ private static boolean matchFilter(Object infoValue, Object filterValue) {
+ return infoValue.equals(filterValue);
+ }
+
+ private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
+ Map<String, ? extends Object> filters) {
+ if (entityInfo == null || entityInfo.isEmpty()) {
+ return false;
+ }
+ for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+ Object infoValue = entityInfo.get(filter.getKey());
+ if (infoValue == null) {
+ return false;
+ }
+ if (!matchFilter(infoValue, filter.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private String getFlowRunPath(String userId, String clusterId, String flowId,
+ Long flowRunId, String appId)
+ throws IOException {
+ if (userId != null && flowId != null && flowRunId != null) {
+ return userId + "/" + flowId + "/" + flowRunId;
+ }
+ if (clusterId == null || appId == null) {
+ throw new IOException("Unable to get flow info");
+ }
+ String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
+ clusterId + "/" + APP_FLOW_MAPPING_FILE;
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(
+ new FileInputStream(
+ appFlowMappingFile), Charset.forName("UTF-8")));
+ CSVParser parser = new CSVParser(reader, csvFormat)) {
+ for (CSVRecord record : parser.getRecords()) {
+ if (record.size() < 4) {
+ continue;
+ }
+ String applicationId = record.get("APP");
+ if (applicationId != null && !applicationId.trim().isEmpty() &&
+ !applicationId.trim().equals(appId)) {
+ continue;
+ }
+ return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
+ record.get(3).trim();
+ }
+ parser.close();
+ }
+ throw new IOException("Unable to get flow info");
+ }
+
+ private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+ Set<String> metricFilters) {
+ Set<String> tempMetrics = new HashSet<String>();
+ for (TimelineMetric metric : metrics) {
+ tempMetrics.add(metric.getId());
+ }
+
+ for (String metricFilter : metricFilters) {
+ if (!tempMetrics.contains(metricFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+ Set<String> eventFilters) {
+ Set<String> tempEvents = new HashSet<String>();
+ for (TimelineEvent event : entityEvents) {
+ tempEvents.add(event.getId());
+ }
+
+ for (String eventFilter : eventFilters) {
+ if (!tempEvents.contains(eventFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
+ EnumSet<Field> fieldsToRetrieve) {
+ TimelineEntity entityToBeReturned = new TimelineEntity();
+ entityToBeReturned.setIdentifier(entity.getIdentifier());
+ entityToBeReturned.setCreatedTime(entity.getCreatedTime());
+ entityToBeReturned.setModifiedTime(entity.getModifiedTime());
+ if (fieldsToRetrieve != null) {
+ fillFields(entityToBeReturned, entity, fieldsToRetrieve);
+ }
+ return entityToBeReturned;
+ }
+
+ private static boolean isTimeInRange(Long time, Long timeBegin,
+ Long timeEnd) {
+ return (time >= timeBegin) && (time <= timeEnd);
+ }
+
+ private static boolean matchRelations(
+ Map<String, Set<String>> entityRelations,
+ Map<String, Set<String>> relations) {
+ for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
+ Set<String> ids = entityRelations.get(relation.getKey());
+ if (ids == null) {
+ return false;
+ }
+ for (String id : relation.getValue()) {
+ if (!ids.contains(id)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private static void mergeEntities(TimelineEntity entity1,
+ TimelineEntity entity2) {
+ // Ideally created time wont change except in the case of issue from client.
+ if (entity2.getCreatedTime() > 0) {
+ entity1.setCreatedTime(entity2.getCreatedTime());
+ }
+ if (entity2.getModifiedTime() > 0) {
+ entity1.setModifiedTime(entity2.getModifiedTime());
+ }
+ for (Entry<String, String> configEntry : entity2.getConfigs().entrySet()) {
+ entity1.addConfig(configEntry.getKey(), configEntry.getValue());
+ }
+ for (Entry<String, Object> infoEntry : entity2.getInfo().entrySet()) {
+ entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
+ }
+ for (Entry<String, Set<String>> isRelatedToEntry :
+ entity2.getIsRelatedToEntities().entrySet()) {
+ String type = isRelatedToEntry.getKey();
+ for (String entityId : isRelatedToEntry.getValue()) {
+ entity1.addIsRelatedToEntity(type, entityId);
+ }
+ }
+ for (Entry<String, Set<String>> relatesToEntry :
+ entity2.getRelatesToEntities().entrySet()) {
+ String type = relatesToEntry.getKey();
+ for (String entityId : relatesToEntry.getValue()) {
+ entity1.addRelatesToEntity(type, entityId);
+ }
+ }
+ for (TimelineEvent event : entity2.getEvents()) {
+ entity1.addEvent(event);
+ }
+ for (TimelineMetric metric2 : entity2.getMetrics()) {
+ boolean found = false;
+ for (TimelineMetric metric1 : entity1.getMetrics()) {
+ if (metric1.getId().equals(metric2.getId())) {
+ metric1.addValues(metric2.getValues());
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ entity1.addMetric(metric2);
+ }
+ }
+ }
+
+ private static TimelineEntity readEntityFromFile(BufferedReader reader)
+ throws IOException {
+ TimelineEntity entity =
+ getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
+ String entityStr = "";
+ while ((entityStr = reader.readLine()) != null) {
+ if (entityStr.trim().isEmpty()) {
+ continue;
+ }
+ TimelineEntity anotherEntity =
+ getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
+ if (!entity.getId().equals(anotherEntity.getId()) ||
+ !entity.getType().equals(anotherEntity.getType())) {
+ continue;
+ }
+ mergeEntities(entity, anotherEntity);
+ }
+ return entity;
+ }
+
+ private Set<TimelineEntity> getEntities(File dir, String entityType,
+ Long limit, Long createdTimeBegin,
+ Long createdTimeEnd, Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) throws IOException {
+ if (limit == null || limit <= 0) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (createdTimeBegin == null || createdTimeBegin <= 0) {
+ createdTimeBegin = 0L;
+ }
+ if (createdTimeEnd == null || createdTimeEnd <= 0) {
+ createdTimeEnd = Long.MAX_VALUE;
+ }
+ if (modifiedTimeBegin == null || modifiedTimeBegin <= 0) {
+ modifiedTimeBegin = 0L;
+ }
+ if (modifiedTimeEnd == null || modifiedTimeEnd <= 0) {
+ modifiedTimeEnd = Long.MAX_VALUE;
+ }
+
+ // First sort the selected entities based on created/start time.
+ Map<Long, Set<TimelineEntity>> sortedEntities =
+ new TreeMap<>(
+ new Comparator<Long>() {
+ @Override
+ public int compare(Long l1, Long l2) {
+ return l2.compareTo(l1);
+ }
+ }
+ );
+ for (File entityFile : dir.listFiles()) {
+ if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
+ continue;
+ }
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(
+ entityFile), Charset.forName("UTF-8")))) {
+ TimelineEntity entity = readEntityFromFile(reader);
+ if (!entity.getType().equals(entityType)) {
+ continue;
+ }
+ if (!isTimeInRange(entity.getCreatedTime(), createdTimeBegin,
+ createdTimeEnd)) {
+ continue;
+ }
+ if (!isTimeInRange(entity.getModifiedTime(), modifiedTimeBegin,
+ modifiedTimeEnd)) {
+ continue;
+ }
+ if (relatesTo != null && !relatesTo.isEmpty() &&
+ !matchRelations(entity.getRelatesToEntities(), relatesTo)) {
+ continue;
+ }
+ if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
+ !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
+ continue;
+ }
+ if (infoFilters != null && !infoFilters.isEmpty() &&
+ !matchFilters(entity.getInfo(), infoFilters)) {
+ continue;
+ }
+ if (configFilters != null && !configFilters.isEmpty() &&
+ !matchFilters(entity.getConfigs(), configFilters)) {
+ continue;
+ }
+ if (metricFilters != null && !metricFilters.isEmpty() &&
+ !matchMetricFilters(entity.getMetrics(), metricFilters)) {
+ continue;
+ }
+ if (eventFilters != null && !eventFilters.isEmpty() &&
+ !matchEventFilters(entity.getEvents(), eventFilters)) {
+ continue;
+ }
+ TimelineEntity entityToBeReturned =
+ createEntityToBeReturned(entity, fieldsToRetrieve);
+ Set<TimelineEntity> entitiesCreatedAtSameTime =
+ sortedEntities.get(entityToBeReturned.getCreatedTime());
+ if (entitiesCreatedAtSameTime == null) {
+ entitiesCreatedAtSameTime = new HashSet<TimelineEntity>();
+ }
+ entitiesCreatedAtSameTime.add(entityToBeReturned);
+ sortedEntities.put(
+ entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime);
+ }
+ }
+
+ Set<TimelineEntity> entities = new HashSet<TimelineEntity>();
+ long entitiesAdded = 0;
+ for (Set<TimelineEntity> entitySet : sortedEntities.values()) {
+ for (TimelineEntity entity : entitySet) {
+ entities.add(entity);
+ ++entitiesAdded;
+ if (entitiesAdded >= limit) {
+ return entities;
+ }
+ }
+ }
+ return entities;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public TimelineEntity getEntity(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException {
+ String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
+ flowRunId, appId);
+ File dir = new File(new File(rootPath, ENTITIES_DIR),
+ clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
+ File entityFile =
+ new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(
+ new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+ TimelineEntity entity = readEntityFromFile(reader);
+ return createEntityToBeReturned(entity, fieldsToRetrieve);
+ }
+ }
+
+ @Override
+ public Set<TimelineEntity> getEntities(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) throws IOException {
+ String flowRunPath =
+ getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
+ File dir =
+ new File(new File(rootPath, ENTITIES_DIR),
+ clusterId + "/" + flowRunPath + "/" + appId + "/" + entityType);
+ return getEntities(dir, entityType, limit,
+ createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+ relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+ eventFilters, fieldsToRetrieve);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
new file mode 100644
index 0000000..e4e305e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+
+/** ATSv2 reader interface. */
+@Private
+@Unstable
+public interface TimelineReader extends Service {
+
+ /**
+ * Default limit for {@link #getEntities}.
+ */
+ long DEFAULT_LIMIT = 100;
+
+ /**
+ * Possible fields to retrieve for {@link #getEntities} and
+ * {@link #getEntity}.
+ */
+ public enum Field {
+ ALL,
+ EVENTS,
+ INFO,
+ METRICS,
+ CONFIGS,
+ RELATES_TO,
+ IS_RELATED_TO
+ }
+
+ /**
+ * <p>The API to fetch the single entity given the entity identifier in the
+ * scope of the given context.</p>
+ *
+ * @param userId
+ * Context user Id(optional).
+ * @param clusterId
+ * Context cluster Id(mandatory).
+ * @param flowId
+ * Context flow Id (optional).
+ * @param flowRunId
+ * Context flow run Id (optional).
+ * @param appId
+ * Context app Id (mandatory)
+ * @param entityType
+ * Entity type (mandatory)
+ * @param entityId
+ * Entity Id (mandatory)
+ * @param fieldsToRetrieve
+ * Specifies which fields of the entity object to retrieve(optional), see
+ * {@link Field}. If null, retrieves 4 fields namely entity id,
+ * entity type, entity created time and entity modified time. All
+ * entities will be returned if {@link Field#ALL} is specified.
+ * @return a {@link TimelineEntity} instance or null. The entity will
+ * contain the metadata plus the given fields to retrieve.
+ * @throws IOException
+ */
+ TimelineEntity getEntity(String userId, String clusterId, String flowId,
+ Long flowRunId, String appId, String entityType, String entityId,
+ EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+ /**
+ * <p>The API to search for a set of entities of the given the entity type in
+ * the scope of the given context which matches the given predicates. The
+ * predicates include the created/modified time window, limit to number of
+ * entities to be returned, and the entities can be filtered by checking
+ * whether they contain the given info/configs entries in the form of
+ * key/value pairs, given metrics in the form of metricsIds and its relation
+ * with metric values given events in the form of the Ids, and whether they
+ * relate to/are related to other entities. For those parameters which have
+ * multiple entries, the qualified entity needs to meet all or them.</p>
+ *
+ * @param userId
+ * Context user Id(optional).
+ * @param clusterId
+ * Context cluster Id(mandatory).
+ * @param flowId
+ * Context flow Id (optional).
+ * @param flowRunId
+ * Context flow run Id (optional).
+ * @param appId
+ * Context app Id (mandatory)
+ * @param entityType
+ * Entity type (mandatory)
+ * @param limit
+ * A limit on the number of entities to return (optional). If null or <=0,
+ * defaults to {@link #DEFAULT_LIMIT}.
+ * @param createdTimeBegin
+ * Matched entities should not be created before this timestamp (optional).
+ * If null or <=0, defaults to 0.
+ * @param createdTimeEnd
+ * Matched entities should not be created after this timestamp (optional).
+ * If null or <=0, defaults to {@link Long#MAX_VALUE}.
+ * @param modifiedTimeBegin
+ * Matched entities should not be modified before this timestamp
+ * (optional). If null or <=0, defaults to 0.
+ * @param modifiedTimeEnd
+ * Matched entities should not be modified after this timestamp (optional).
+ * If null or <=0, defaults to {@link Long#MAX_VALUE}.
+ * @param relatesTo
+ * Matched entities should relate to given entities (optional).
+ * @param isRelatedTo
+ * Matched entities should be related to given entities (optional).
+ * @param infoFilters
+ * Matched entities should have exact matches to the given info represented
+ * as key-value pairs (optional). If null or empty, the filter is not
+ * applied.
+ * @param configFilters
+ * Matched entities should have exact matches to the given configs
+ * represented as key-value pairs (optional). If null or empty, the filter
+ * is not applied.
+ * @param metricFilters
+ * Matched entities should contain the given metrics (optional). If null
+ * or empty, the filter is not applied.
+ * @param eventFilters
+ * Matched entities should contain the given events (optional). If null
+ * or empty, the filter is not applied.
+ * @param fieldsToRetrieve
+ * Specifies which fields of the entity object to retrieve(optional), see
+ * {@link Field}. If null, retrieves 4 fields namely entity id,
+ * entity type, entity created time and entity modified time. All
+ * entities will be returned if {@link Field#ALL} is specified.
+ * @return A set of {@link TimelineEntity} instances of the given entity type
+ * in the given context scope which matches the given predicates
+ * ordered by created time, descending. Each entity will only contain the
+ * metadata(id, type, created and modified times) plus the given fields to
+ * retrieve.
+ * @throws IOException
+ */
+ Set<TimelineEntity> getEntities(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/499ce52c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
new file mode 100644
index 0000000..4e23e49
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileSystemTimelineReaderImpl {
+
+ private static final String rootDir =
+ FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+ FileSystemTimelineReaderImpl reader;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ loadEntityData();
+ // Create app flow mapping file.
+ CSVFormat format =
+ CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+ String appFlowMappingFile = rootDir + "/entities/cluster1/" +
+ FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
+ try (PrintWriter out =
+ new PrintWriter(new BufferedWriter(
+ new FileWriter(appFlowMappingFile, true)));
+ CSVPrinter printer = new CSVPrinter(out, format)){
+ printer.printRecord("app1", "user1", "flow1", 1);
+ printer.printRecord("app2","user1","flow1,flow",1);
+ printer.close();
+ }
+ (new File(rootDir)).deleteOnExit();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ FileUtils.deleteDirectory(new File(rootDir));
+ }
+
+ @Before
+ public void init() throws Exception {
+ reader = new FileSystemTimelineReaderImpl();
+ Configuration conf = new YarnConfiguration();
+ conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ rootDir);
+ reader.init(conf);
+ }
+
+ private static void writeEntityFile(TimelineEntity entity, File dir)
+ throws Exception {
+ if (!dir.exists()) {
+ if (!dir.mkdirs()) {
+ throw new IOException("Could not create directories for " + dir);
+ }
+ }
+ String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
+ try (PrintWriter out =
+ new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
+ out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ out.write("\n");
+ out.close();
+ }
+ }
+
+ private static void loadEntityData() throws Exception {
+ File appDir = new File(rootDir +
+ "/entities/cluster1/user1/flow1/1/app1/app/");
+ TimelineEntity entity11 = new TimelineEntity();
+ entity11.setId("id_1");
+ entity11.setType("app");
+ entity11.setCreatedTime(1425016502000L);
+ entity11.setModifiedTime(1425016502050L);
+ Map<String, Object> info1 = new HashMap<String, Object>();
+ info1.put("info1", "val1");
+ entity11.addInfo(info1);
+ TimelineEvent event = new TimelineEvent();
+ event.setId("event_1");
+ event.setTimestamp(1425016502003L);
+ entity11.addEvent(event);
+ Set<TimelineMetric> metrics = new HashSet<TimelineMetric>();
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setId("metric1");
+ metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric1.addValue(1425016502006L, 113.2F);
+ metrics.add(metric1);
+ TimelineMetric metric2 = new TimelineMetric();
+ metric2.setId("metric2");
+ metric2.setType(TimelineMetric.Type.TIME_SERIES);
+ metric2.addValue(1425016502016L, 34);
+ metrics.add(metric2);
+ entity11.setMetrics(metrics);
+ Map<String,String> configs = new HashMap<String, String>();
+ configs.put("config_1", "123");
+ entity11.setConfigs(configs);
+ entity11.addRelatesToEntity("flow", "flow1");
+ entity11.addIsRelatedToEntity("type1", "tid1_1");
+ writeEntityFile(entity11, appDir);
+ TimelineEntity entity12 = new TimelineEntity();
+ entity12.setId("id_1");
+ entity12.setType("app");
+ entity12.setModifiedTime(1425016503000L);
+ configs.clear();
+ configs.put("config_2", "23");
+ configs.put("config_3", "abc");
+ entity12.addConfigs(configs);
+ metrics.clear();
+ TimelineMetric metric12 = new TimelineMetric();
+ metric12.setId("metric2");
+ metric12.setType(TimelineMetric.Type.TIME_SERIES);
+ metric12.addValue(1425016502032L, 48);
+ metric12.addValue(1425016502054L, 51);
+ metrics.add(metric12);
+ TimelineMetric metric3 = new TimelineMetric();
+ metric3.setId("metric3");
+ metric3.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric3.addValue(1425016502060L, 23L);
+ metrics.add(metric3);
+ entity12.setMetrics(metrics);
+ entity12.addIsRelatedToEntity("type1", "tid1_2");
+ entity12.addIsRelatedToEntity("type2", "tid2_1`");
+ TimelineEvent event15 = new TimelineEvent();
+ event15.setId("event_5");
+ event15.setTimestamp(1425016502017L);
+ entity12.addEvent(event15);
+ writeEntityFile(entity12, appDir);
+
+ TimelineEntity entity2 = new TimelineEntity();
+ entity2.setId("id_2");
+ entity2.setType("app");
+ entity2.setCreatedTime(1425016501050L);
+ entity2.setModifiedTime(1425016502010L);
+ Map<String, Object> info2 = new HashMap<String, Object>();
+ info1.put("info2", 4);
+ entity2.addInfo(info2);
+ Map<String,String> configs2 = new HashMap<String, String>();
+ configs2.put("config_1", "123");
+ configs2.put("config_3", "def");
+ entity2.setConfigs(configs2);
+ TimelineEvent event2 = new TimelineEvent();
+ event2.setId("event_2");
+ event2.setTimestamp(1425016501003L);
+ entity2.addEvent(event2);
+ Set<TimelineMetric> metrics2 = new HashSet<TimelineMetric>();
+ TimelineMetric metric21 = new TimelineMetric();
+ metric21.setId("metric1");
+ metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric21.addValue(1425016501006L, 123.2F);
+ metrics2.add(metric21);
+ TimelineMetric metric22 = new TimelineMetric();
+ metric22.setId("metric2");
+ metric22.setType(TimelineMetric.Type.TIME_SERIES);
+ metric22.addValue(1425016501056L, 31);
+ metric22.addValue(1425016501084L, 70);
+ metrics2.add(metric22);
+ TimelineMetric metric23 = new TimelineMetric();
+ metric23.setId("metric3");
+ metric23.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric23.addValue(1425016502060L, 23L);
+ metrics2.add(metric23);
+ entity2.setMetrics(metrics2);
+ entity2.addRelatesToEntity("flow", "flow2");
+ writeEntityFile(entity2, appDir);
+
+ TimelineEntity entity3 = new TimelineEntity();
+ entity3.setId("id_3");
+ entity3.setType("app");
+ entity3.setCreatedTime(1425016501050L);
+ entity3.setModifiedTime(1425016502010L);
+ Map<String, Object> info3 = new HashMap<String, Object>();
+ info3.put("info2", 3.5);
+ entity3.addInfo(info3);
+ Map<String,String> configs3 = new HashMap<String, String>();
+ configs3.put("config_1", "123");
+ configs3.put("config_3", "abc");
+ entity3.setConfigs(configs3);
+ TimelineEvent event3 = new TimelineEvent();
+ event3.setId("event_2");
+ event3.setTimestamp(1425016501003L);
+ entity3.addEvent(event3);
+ TimelineEvent event4 = new TimelineEvent();
+ event4.setId("event_4");
+ event4.setTimestamp(1425016502006L);
+ entity3.addEvent(event4);
+ Set<TimelineMetric> metrics3 = new HashSet<TimelineMetric>();
+ TimelineMetric metric31 = new TimelineMetric();
+ metric31.setId("metric1");
+ metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric31.addValue(1425016501006L, 124.8F);
+ metrics3.add(metric31);
+ TimelineMetric metric32 = new TimelineMetric();
+ metric32.setId("metric2");
+ metric32.setType(TimelineMetric.Type.TIME_SERIES);
+ metric32.addValue(1425016501056L, 31);
+ metric32.addValue(1425016501084L, 74);
+ metrics3.add(metric32);
+ entity3.setMetrics(metrics3);
+ entity3.addIsRelatedToEntity("type1", "tid1_2");
+ writeEntityFile(entity3, appDir);
+
+ TimelineEntity entity4 = new TimelineEntity();
+ entity4.setId("id_4");
+ entity4.setType("app");
+ entity4.setCreatedTime(1425016502050L);
+ entity4.setModifiedTime(1425016503010L);
+ TimelineEvent event44 = new TimelineEvent();
+ event44.setId("event_4");
+ event44.setTimestamp(1425016502003L);
+ entity4.addEvent(event44);
+ writeEntityFile(entity4, appDir);
+
+ File appDir2 = new File(rootDir +
+ "/entities/cluster1/user1/flow1,flow/1/app2/app/");
+ TimelineEntity entity5 = new TimelineEntity();
+ entity5.setId("id_5");
+ entity5.setType("app");
+ entity5.setCreatedTime(1425016502050L);
+ entity5.setModifiedTime(1425016503010L);
+ writeEntityFile(entity5, appDir2);
+ }
+
+ public TimelineReader getTimelineReader() {
+ return reader;
+ }
+
+ @Test
+ public void testGetEntityDefaultView() throws Exception {
+ // If no fields are specified, entity is returned with default view i.e.
+ // only the id, created and modified time
+ TimelineEntity result =
+ reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
+ "app", "id_1", null);
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals(1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(1425016503000L, result.getModifiedTime());
+ Assert.assertEquals(0, result.getConfigs().size());
+ Assert.assertEquals(0, result.getMetrics().size());
+ }
+
+ @Test
+ public void testGetEntityByClusterAndApp() throws Exception {
+ // Cluster and AppId should be enough to get an entity.
+ TimelineEntity result =
+ reader.getEntity(null, "cluster1", null, null, "app1",
+ "app", "id_1", null);
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals(1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(1425016503000L, result.getModifiedTime());
+ Assert.assertEquals(0, result.getConfigs().size());
+ Assert.assertEquals(0, result.getMetrics().size());
+ }
+
+ /** This test checks whether we can handle commas in app flow mapping csv */
+ @Test
+ public void testAppFlowMappingCsv() throws Exception {
+ // Test getting an entity by cluster and app where flow entry
+ // in app flow mapping csv has commas.
+ TimelineEntity result =
+ reader.getEntity(null, "cluster1", null, null, "app2",
+ "app", "id_5", null);
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_5")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals(1425016502050L, result.getCreatedTime());
+ Assert.assertEquals(1425016503010L, result.getModifiedTime());
+ }
+
+ @Test
+ public void testGetEntityCustomFields() throws Exception {
+ // Specified fields in addition to default view will be returned.
+ TimelineEntity result =
+ reader.getEntity("user1", "cluster1", "flow1", 1L,
+ "app1", "app", "id_1",
+ EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals(1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(1425016503000L, result.getModifiedTime());
+ Assert.assertEquals(3, result.getConfigs().size());
+ Assert.assertEquals(3, result.getMetrics().size());
+ Assert.assertEquals(1, result.getInfo().size());
+ // No events will be returned
+ Assert.assertEquals(0, result.getEvents().size());
+ }
+
+ @Test
+ public void testGetEntityAllFields() throws Exception {
+ // All fields of TimelineEntity will be returned.
+ TimelineEntity result =
+ reader.getEntity("user1", "cluster1", "flow1", 1L,
+ "app1", "app", "id_1", EnumSet.of(Field.ALL));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals(1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(1425016503000L, result.getModifiedTime());
+ Assert.assertEquals(3, result.getConfigs().size());
+ Assert.assertEquals(3, result.getMetrics().size());
+ // All fields including events will be returned.
+ Assert.assertEquals(2, result.getEvents().size());
+ }
+
+ @Test
+ public void testGetAllEntities() throws Exception {
+ Set<TimelineEntity> result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, null, null, null, null, null,
+ null, null);
+ // All 3 entities will be returned
+ Assert.assertEquals(4, result.size());
+ }
+
+ @Test
+ public void testGetEntitiesWithLimit() throws Exception {
+ Set<TimelineEntity> result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ 2L, null, null, null, null, null, null, null, null, null,
+ null, null);
+ Assert.assertEquals(2, result.size());
+ // Needs to be rewritten once hashcode and equals for
+ // TimelineEntity is implemented
+ // Entities with id_1 and id_4 should be returned,
+ // based on created time, descending.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+ Assert.fail("Entity not sorted by created time");
+ }
+ }
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ 3L, null, null, null, null, null, null, null, null, null,
+ null, null);
+ // Even though 2 entities out of 4 have same created time, one entity
+ // is left out due to limit
+ Assert.assertEquals(3, result.size());
+ }
+
+ @Test
+ public void testGetEntitiesByTimeWindows() throws Exception {
+ // Get entities based on created time start and end time range.
+ Set<TimelineEntity> result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, 1425016502030L, 1425016502060L, null, null, null, null, null,
+ null, null, null, null);
+ Assert.assertEquals(1, result.size());
+ // Only one entity with ID id_4 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on created time range");
+ }
+ }
+
+ // Get entities if only created time end is specified.
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, 1425016502010L, null, null, null, null, null, null,
+ null, null, null);
+ Assert.assertEquals(3, result.size());
+ for (TimelineEntity entity : result) {
+ if (entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on created time range");
+ }
+ }
+
+ // Get entities if only created time start is specified.
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, 1425016502010L, null, null, null, null, null, null, null,
+ null, null, null);
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on created time range");
+ }
+ }
+
+ // Get entities based on modified time start and end time range.
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, 1425016502090L, 1425016503020L, null, null, null,
+ null, null, null, null);
+ Assert.assertEquals(2, result.size());
+ // Two entities with IDs' id_1 and id_4 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on modified time range");
+ }
+ }
+
+ // Get entities if only modified time end is specified.
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, 1425016502090L, null, null, null, null,
+ null, null, null);
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on modified time range");
+ }
+ }
+
+ // Get entities if only modified time start is specified.
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, 1425016503005L, null, null, null, null, null,
+ null, null, null);
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on modified time range");
+ }
+ }
+ }
+
+ @Test
+ public void testGetFilteredEntities() throws Exception {
+ // Get entities based on info filters.
+ Map<String, Object> infoFilters = new HashMap<String, Object>();
+ infoFilters.put("info2", 3.5);
+ Set<TimelineEntity> result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, null, null, infoFilters, null, null,
+ null, null);
+ Assert.assertEquals(1, result.size());
+ // Only one entity with ID id_3 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ // Get entities based on config filters.
+ Map<String, String> configFilters = new HashMap<String, String>();
+ configFilters.put("config_1", "123");
+ configFilters.put("config_3", "abc");
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, null, null, null, configFilters, null,
+ null, null);
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ // Get entities based on event filters.
+ Set<String> eventFilters = new HashSet<String>();
+ eventFilters.add("event_2");
+ eventFilters.add("event_4");
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, null, null, null, null, null,
+ eventFilters, null);
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on event filters");
+ }
+ }
+
+ // Get entities based on metric filters.
+ Set<String> metricFilters = new HashSet<String>();
+ metricFilters.add("metric3");
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, null, null, null, null, metricFilters,
+ null, null);
+ Assert.assertEquals(2, result.size());
+ // Two entities with IDs' id_1 and id_2 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+ }
+
+ @Test
+ public void testGetEntitiesByRelations() throws Exception {
+ // Get entities based on relatesTo.
+ Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+ Set<String> relatesToIds = new HashSet<String>();
+ relatesToIds.add("flow1");
+ relatesTo.put("flow", relatesToIds);
+ Set<TimelineEntity> result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, relatesTo, null, null, null, null,
+ null, null);
+ Assert.assertEquals(1, result.size());
+ // Only one entity with ID id_1 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1")) {
+ Assert.fail("Incorrect filtering based on relatesTo");
+ }
+ }
+
+ // Get entities based on isRelatedTo.
+ Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+ Set<String> isRelatedToIds = new HashSet<String>();
+ isRelatedToIds.add("tid1_2");
+ isRelatedTo.put("type1", isRelatedToIds);
+ result =
+ reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
+ null, null, null, null, null, null, isRelatedTo, null, null, null,
+ null, null);
+ Assert.assertEquals(2, result.size());
+ // Two entities with IDs' id_1 and id_3 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on isRelatedTo");
+ }
+ }
+ }
+}