You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2015/09/22 22:47:31 UTC
[1/2] hadoop git commit: YARN-4074. [timeline reader] implement
support for querying for flows and flow runs (sjlee via vrushali)
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 4b37985e6 -> 2e7e0f0bb
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index c514c20..889ae19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -20,13 +20,8 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
import java.util.Set;
-import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,47 +29,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader {
private static final Log LOG = LogFactory
.getLog(HBaseTimelineReaderImpl.class);
- private static final long DEFAULT_BEGIN_TIME = 0L;
- private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
private Configuration hbaseConf = null;
private Connection conn;
- private EntityTable entityTable;
- private AppToFlowTable appToFlowTable;
- private ApplicationTable applicationTable;
public HBaseTimelineReaderImpl() {
super(HBaseTimelineReaderImpl.class.getName());
@@ -85,9 +50,6 @@ public class HBaseTimelineReaderImpl
super.serviceInit(conf);
hbaseConf = HBaseConfiguration.create(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
- entityTable = new EntityTable();
- appToFlowTable = new AppToFlowTable();
- applicationTable = new ApplicationTable();
}
@Override
@@ -104,35 +66,10 @@ public class HBaseTimelineReaderImpl
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve)
throws IOException {
- validateParams(userId, clusterId, appId, entityType, entityId, true);
- // In reality both should be null or neither should be null
- if (flowId == null || flowRunId == null) {
- FlowContext context = lookupFlowContext(clusterId, appId);
- flowId = context.flowId;
- flowRunId = context.flowRunId;
- }
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.noneOf(Field.class);
- }
-
- boolean isApplication = isApplicationEntity(entityType);
- byte[] rowKey = isApplication ?
- ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
- appId) :
- EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
- entityType, entityId);
- Get get = new Get(rowKey);
- get.setMaxVersions(Integer.MAX_VALUE);
- Result result = isApplication ?
- applicationTable.getResult(hbaseConf, conn, get) :
- entityTable.getResult(hbaseConf, conn, get);
- return parseEntity(result, fieldsToRetrieve,
- false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
- DEFAULT_END_TIME, null, null, null, null, null, null, isApplication);
- }
-
- private static boolean isApplicationEntity(String entityType) {
- return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType);
+ TimelineEntityReader reader =
+ TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
+ flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
+ return reader.readEntity(hbaseConf, conn);
}
@Override
@@ -144,361 +81,12 @@ public class HBaseTimelineReaderImpl
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
EnumSet<Field> fieldsToRetrieve) throws IOException {
- validateParams(userId, clusterId, appId, entityType, null, false);
- // In reality both should be null or neither should be null
- if (flowId == null || flowRunId == null) {
- FlowContext context = lookupFlowContext(clusterId, appId);
- flowId = context.flowId;
- flowRunId = context.flowRunId;
- }
- if (limit == null) {
- limit = TimelineReader.DEFAULT_LIMIT;
- }
- if (createdTimeBegin == null) {
- createdTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (createdTimeEnd == null) {
- createdTimeEnd = DEFAULT_END_TIME;
- }
- if (modifiedTimeBegin == null) {
- modifiedTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (modifiedTimeEnd == null) {
- modifiedTimeEnd = DEFAULT_END_TIME;
- }
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.noneOf(Field.class);
- }
-
- NavigableSet<TimelineEntity> entities = new TreeSet<>();
- boolean isApplication = isApplicationEntity(entityType);
- if (isApplication) {
- // If getEntities() is called for an application, there can be at most
- // one entity. If the entity passes the filter, it is returned. Otherwise,
- // an empty set is returned.
- byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
- flowRunId, appId);
- Get get = new Get(rowKey);
- get.setMaxVersions(Integer.MAX_VALUE);
- Result result = applicationTable.getResult(hbaseConf, conn, get);
- TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
- true, createdTimeBegin, createdTimeEnd, true, modifiedTimeBegin,
- modifiedTimeEnd, isRelatedTo, relatesTo, infoFilters, configFilters,
- eventFilters, metricFilters, isApplication);
- if (entity != null) {
- entities.add(entity);
- }
- } else {
- // Scan through part of the table to find the entities belong to one app
- // and one type
- Scan scan = new Scan();
- scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
- clusterId, userId, flowId, flowRunId, appId, entityType));
- scan.setMaxVersions(Integer.MAX_VALUE);
- ResultScanner scanner =
- entityTable.getResultScanner(hbaseConf, conn, scan);
- for (Result result : scanner) {
- TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
- true, createdTimeBegin, createdTimeEnd,
- true, modifiedTimeBegin, modifiedTimeEnd,
- isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
- metricFilters, isApplication);
- if (entity == null) {
- continue;
- }
- if (entities.size() > limit) {
- entities.pollLast();
- }
- entities.add(entity);
- }
- }
- return entities;
- }
-
- private FlowContext lookupFlowContext(String clusterId, String appId)
- throws IOException {
- byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
- Get get = new Get(rowKey);
- Result result = appToFlowTable.getResult(hbaseConf, conn, get);
- if (result != null && !result.isEmpty()) {
- return new FlowContext(
- AppToFlowColumn.FLOW_ID.readResult(result).toString(),
- ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
- } else {
- throw new IOException(
- "Unable to find the context flow ID and flow run ID for clusterId=" +
- clusterId + ", appId=" + appId);
- }
- }
-
- private static class FlowContext {
- private String flowId;
- private Long flowRunId;
- public FlowContext(String flowId, Long flowRunId) {
- this.flowId = flowId;
- this.flowRunId = flowRunId;
- }
- }
-
- private static void validateParams(String userId, String clusterId,
- String appId, String entityType, String entityId, boolean checkEntityId) {
- Preconditions.checkNotNull(userId, "userId shouldn't be null");
- Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
- Preconditions.checkNotNull(appId, "appId shouldn't be null");
- Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
- if (checkEntityId) {
- Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
- }
- }
-
- private static TimelineEntity parseEntity(
- Result result, EnumSet<Field> fieldsToRetrieve,
- boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
- boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
- Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> eventFilters, Set<String> metricFilters,
- boolean isApplication)
- throws IOException {
- if (result == null || result.isEmpty()) {
- return null;
- }
- TimelineEntity entity = new TimelineEntity();
- String entityType = isApplication ?
- TimelineEntityType.YARN_APPLICATION.toString() :
- EntityColumn.TYPE.readResult(result).toString();
- entity.setType(entityType);
- String entityId = isApplication ?
- ApplicationColumn.ID.readResult(result).toString() :
- EntityColumn.ID.readResult(result).toString();
- entity.setId(entityId);
-
- // fetch created time
- Number createdTime = isApplication ?
- (Number)ApplicationColumn.CREATED_TIME.readResult(result) :
- (Number)EntityColumn.CREATED_TIME.readResult(result);
- entity.setCreatedTime(createdTime.longValue());
- if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
- entity.getCreatedTime() > createdTimeEnd)) {
- return null;
- }
-
- // fetch modified time
- Number modifiedTime = isApplication ?
- (Number)ApplicationColumn.MODIFIED_TIME.readResult(result) :
- (Number)EntityColumn.MODIFIED_TIME.readResult(result);
- entity.setModifiedTime(modifiedTime.longValue());
- if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
- entity.getModifiedTime() > modifiedTimeEnd)) {
- return null;
- }
-
- // fetch is related to entities
- boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
- if (isApplication) {
- readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
- true);
- } else {
- readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO,
- true);
- }
- if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
- entity.getIsRelatedToEntities(), isRelatedTo)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
- entity.getIsRelatedToEntities().clear();
- }
- }
-
- // fetch relates to entities
- boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
- if (isApplication) {
- readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
- false);
- } else {
- readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
- }
- if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
- entity.getRelatesToEntities(), relatesTo)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.RELATES_TO)) {
- entity.getRelatesToEntities().clear();
- }
- }
-
- // fetch info
- boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
- if (isApplication) {
- readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
- } else {
- readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
- }
- if (checkInfo &&
- !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.INFO)) {
- entity.getInfo().clear();
- }
- }
-
- // fetch configs
- boolean checkConfigs = configFilters != null && configFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
- if (isApplication) {
- readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
- } else {
- readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
- }
- if (checkConfigs && !TimelineReaderUtils.matchFilters(
- entity.getConfigs(), configFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.CONFIGS)) {
- entity.getConfigs().clear();
- }
- }
-
- // fetch events
- boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
- readEvents(entity, result, isApplication);
- if (checkEvents && !TimelineReaderUtils.matchEventFilters(
- entity.getEvents(), eventFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.EVENTS)) {
- entity.getEvents().clear();
- }
- }
-
- // fetch metrics
- boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
- readMetrics(entity, result, isApplication);
- if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
- entity.getMetrics(), metricFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.METRICS)) {
- entity.getMetrics().clear();
- }
- }
- return entity;
- }
-
- private static <T> void readRelationship(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isRelatedTo) throws IOException {
- // isRelatedTo and relatesTo are of type Map<String, Set<String>>
- Map<String, Object> columns = prefix.readResults(result);
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- for (String id : Separator.VALUES.splitEncoded(
- column.getValue().toString())) {
- if (isRelatedTo) {
- entity.addIsRelatedToEntity(column.getKey(), id);
- } else {
- entity.addRelatesToEntity(column.getKey(), id);
- }
- }
- }
- }
-
- private static <T> void readKeyValuePairs(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isConfig) throws IOException {
- // info and configuration are of type Map<String, Object or String>
- Map<String, Object> columns = prefix.readResults(result);
- if (isConfig) {
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- entity.addConfig(column.getKey(), column.getValue().toString());
- }
- } else {
- entity.addInfo(columns);
- }
- }
-
- /**
- * Read events from the entity table or the application table. The column name
- * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
- * if there is no info associated with the event.
- *
- * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
- * schema description.
- */
- private static void readEvents(TimelineEntity entity, Result result,
- boolean isApplication) throws IOException {
- Map<String, TimelineEvent> eventsMap = new HashMap<>();
- Map<?, Object> eventsResult = isApplication ?
- ApplicationColumnPrefix.EVENT.
- readResultsHavingCompoundColumnQualifiers(result) :
- EntityColumnPrefix.EVENT.
- readResultsHavingCompoundColumnQualifiers(result);
- for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
- byte[][] karr = (byte[][])eventResult.getKey();
- // the column name is of the form "eventId=timestamp=infoKey"
- if (karr.length == 3) {
- String id = Bytes.toString(karr[0]);
- long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
- String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
- TimelineEvent event = eventsMap.get(key);
- if (event == null) {
- event = new TimelineEvent();
- event.setId(id);
- event.setTimestamp(ts);
- eventsMap.put(key, event);
- }
- // handle empty info
- String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
- if (infoKey != null) {
- event.addInfo(infoKey, eventResult.getValue());
- }
- } else {
- LOG.warn("incorrectly formatted column name: it will be discarded");
- continue;
- }
- }
- Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
- entity.addEvents(eventsSet);
- }
-
- private static void readMetrics(TimelineEntity entity, Result result,
- boolean isApplication) throws IOException {
- NavigableMap<String, NavigableMap<Long, Number>> metricsResult;
- if (isApplication) {
- metricsResult =
- ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result);
- } else {
- metricsResult =
- EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
- }
- for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
- metricsResult.entrySet()) {
- TimelineMetric metric = new TimelineMetric();
- metric.setId(metricResult.getKey());
- // Simply assume that if the value set contains more than 1 elements, the
- // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
- metric.setType(metricResult.getValue().size() > 1 ?
- TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
- metric.addValues(metricResult.getValue());
- entity.addMetric(metric);
- }
+ TimelineEntityReader reader =
+ TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
+ clusterId, flowId, flowRunId, appId, entityType, limit,
+ createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
+ modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
+ metricFilters, eventFilters, fieldsToRetrieve);
+ return reader.readEntities(hbaseConf, conn);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
new file mode 100644
index 0000000..0d1134c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+
+/**
+ * The base class for reading and deserializing timeline entities from the
+ * HBase storage. Different types can be defined for different types of the
+ * entities that are being requested.
+ */
+abstract class TimelineEntityReader {
+ protected final boolean singleEntityRead;
+
+ protected String userId;
+ protected String clusterId;
+ protected String flowId;
+ protected Long flowRunId;
+ protected String appId;
+ protected String entityType;
+ protected EnumSet<Field> fieldsToRetrieve;
+ // used only for a single entity read mode
+ protected String entityId;
+ // used only for multiple entity read mode
+ protected Long limit;
+ protected Long createdTimeBegin;
+ protected Long createdTimeEnd;
+ protected Long modifiedTimeBegin;
+ protected Long modifiedTimeEnd;
+ protected Map<String, Set<String>> relatesTo;
+ protected Map<String, Set<String>> isRelatedTo;
+ protected Map<String, Object> infoFilters;
+ protected Map<String, String> configFilters;
+ protected Set<String> metricFilters;
+ protected Set<String> eventFilters;
+
+ /**
+ * Main table the entity reader uses.
+ */
+ protected BaseTable<?> table;
+
+ /**
+ * Instantiates a reader for multiple-entity reads.
+ */
+ protected TimelineEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ this.singleEntityRead = false;
+ this.userId = userId;
+ this.clusterId = clusterId;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ this.appId = appId;
+ this.entityType = entityType;
+ this.fieldsToRetrieve = fieldsToRetrieve;
+ this.limit = limit;
+ this.createdTimeBegin = createdTimeBegin;
+ this.createdTimeEnd = createdTimeEnd;
+ this.modifiedTimeBegin = modifiedTimeBegin;
+ this.modifiedTimeEnd = modifiedTimeEnd;
+ this.relatesTo = relatesTo;
+ this.isRelatedTo = isRelatedTo;
+ this.infoFilters = infoFilters;
+ this.configFilters = configFilters;
+ this.metricFilters = metricFilters;
+ this.eventFilters = eventFilters;
+
+ this.table = getTable();
+ }
+
+ /**
+ * Instantiates a reader for single-entity reads.
+ */
+ protected TimelineEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve) {
+ this.singleEntityRead = true;
+ this.userId = userId;
+ this.clusterId = clusterId;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ this.appId = appId;
+ this.entityType = entityType;
+ this.fieldsToRetrieve = fieldsToRetrieve;
+ this.entityId = entityId;
+
+ this.table = getTable();
+ }
+
+ /**
+ * Reads and deserializes a single timeline entity from the HBase storage.
+ */
+ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ validateParams();
+ augmentParams(hbaseConf, conn);
+
+ Result result = getResult(hbaseConf, conn);
+ return parseEntity(result);
+ }
+
+ /**
+ * Reads and deserializes a set of timeline entities from the HBase storage.
+ * It goes through all the results available, and returns the number of
+ * entries as specified in the limit in the entity's natural sort order.
+ */
+ public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ validateParams();
+ augmentParams(hbaseConf, conn);
+
+ NavigableSet<TimelineEntity> entities = new TreeSet<>();
+ Iterable<Result> results = getResults(hbaseConf, conn);
+ for (Result result : results) {
+ TimelineEntity entity = parseEntity(result);
+ if (entity == null) {
+ continue;
+ }
+ entities.add(entity);
+ if (entities.size() > limit) {
+ entities.pollLast();
+ }
+ }
+ return entities;
+ }
+
+ /**
+ * Returns the main table to be used by the entity reader.
+ */
+ protected abstract BaseTable<?> getTable();
+
+ /**
+ * Validates the required parameters to read the entities.
+ */
+ protected abstract void validateParams();
+
+ /**
+ * Sets certain parameters to defaults if the values are not provided.
+ */
+ protected abstract void augmentParams(Configuration hbaseConf,
+ Connection conn) throws IOException;
+
+ /**
+ * Fetches a {@link Result} instance for a single-entity read.
+ *
+ * @return the {@link Result} instance or null if no such record is found.
+ */
+ protected abstract Result getResult(Configuration hbaseConf, Connection conn)
+ throws IOException;
+
+ /**
+ * Fetches an iterator for {@link Result} instances for a multi-entity read.
+ */
+ protected abstract Iterable<Result> getResults(Configuration hbaseConf,
+ Connection conn) throws IOException;
+
+ /**
+ * Given a {@link Result} instance, deserializes and creates a
+ * {@link TimelineEntity}.
+ *
+ * @return the {@link TimelineEntity} instance, or null if the {@link Result}
+ * is null or empty.
+ */
+ protected abstract TimelineEntity parseEntity(Result result)
+ throws IOException;
+
+ /**
+ * Helper method for reading and deserializing {@link TimelineMetric} objects
+ * using the specified column prefix. The timeline metrics then are added to
+ * the given timeline entity.
+ */
+ protected void readMetrics(TimelineEntity entity, Result result,
+ ColumnPrefix<?> columnPrefix) throws IOException {
+ NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+ columnPrefix.readResultsWithTimestamps(result);
+ for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+ metricsResult.entrySet()) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId(metricResult.getKey());
+ // Simply assume that if the value set contains more than 1 elements, the
+ // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+ metric.setType(metricResult.getValue().size() > 1 ?
+ TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+ metric.addValues(metricResult.getValue());
+ entity.addMetric(metric);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
new file mode 100644
index 0000000..4fdef40
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+
+/**
+ * Factory methods for instantiating a timeline entity reader.
+ */
+class TimelineEntityReaderFactory {
+ /**
+ * Creates a timeline entity reader instance for reading a single entity with
+ * the specified input.
+ */
+ public static TimelineEntityReader createSingleEntityReader(String userId,
+ String clusterId, String flowId, Long flowRunId, String appId,
+ String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
+ // currently the types that are handled separate from the generic entity
+ // table are application, flow run, and flow activity entities
+ if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+ return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, entityId, fieldsToRetrieve);
+ } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+ return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, entityId, fieldsToRetrieve);
+ } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+ return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, entityId, fieldsToRetrieve);
+ } else {
+ // assume we're dealing with a generic entity read
+ return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, entityId, fieldsToRetrieve);
+ }
+ }
+
+ /**
+ * Creates a timeline entity reader instance for reading set of entities with
+ * the specified input and predicates.
+ */
+ public static TimelineEntityReader createMultipleEntitiesReader(String userId,
+ String clusterId, String flowId, Long flowRunId, String appId,
+ String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ // currently the types that are handled separate from the generic entity
+ // table are application, flow run, and flow activity entities
+ if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
+ return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+ modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+ infoFilters, configFilters, metricFilters, eventFilters,
+ fieldsToRetrieve);
+ } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
+ return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+ modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+ infoFilters, configFilters, metricFilters, eventFilters,
+ fieldsToRetrieve);
+ } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
+ return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+ modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+ infoFilters, configFilters, metricFilters, eventFilters,
+ fieldsToRetrieve);
+ } else {
+ // assume we're dealing with a generic entity read
+ return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
+ appId, entityType, limit, createdTimeBegin, createdTimeEnd,
+ modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
+ infoFilters, configFilters, metricFilters, eventFilters,
+ fieldsToRetrieve);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index 5f3868b..e3b5a87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -19,14 +19,46 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
/**
* Represents a rowkey for the application table.
*/
public class ApplicationRowKey {
- // TODO: more methods are needed for this class.
+ private final String clusterId;
+ private final String userId;
+ private final String flowId;
+ private final long flowRunId;
+ private final String appId;
- // TODO: API needs to be cleaned up.
+ public ApplicationRowKey(String clusterId, String userId, String flowId,
+ long flowRunId, String appId) {
+ this.clusterId = clusterId;
+ this.userId = userId;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ this.appId = appId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public long getFlowRunId() {
+ return flowRunId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
/**
* Constructs a row key for the application table as follows:
@@ -46,22 +78,32 @@ public class ApplicationRowKey {
flowId));
// Note that flowRunId is a long, so we can't encode them all at the same
// time.
- byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId));
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
byte[] third = Bytes.toBytes(appId);
return Separator.QUALIFIERS.join(first, second, third);
}
/**
- * Converts a timestamp into its inverse timestamp to be used in (row) keys
- * where we want to have the most recent timestamp in the top of the table
- * (scans start at the most recent timestamp first).
- *
- * @param key value to be inverted so that the latest version will be first in
- * a scan.
- * @return inverted long
+ * Given the raw row key as bytes, returns the row key as an object.
*/
- public static long invert(Long key) {
- return Long.MAX_VALUE - key;
- }
+ public static ApplicationRowKey parseRowKey(byte[] rowKey) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+ if (rowKeyComponents.length < 5) {
+ throw new IllegalArgumentException("the row key is not valid for " +
+ "an application");
+ }
+ String clusterId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+ String userId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+ String flowId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+ long flowRunId =
+ TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+ String appId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+ return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index ad4fec6..ca88056 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -24,6 +24,22 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
* Represents a rowkey for the app_flow table.
*/
public class AppToFlowRowKey {
+ private final String clusterId;
+ private final String appId;
+
+ public AppToFlowRowKey(String clusterId, String appId) {
+ this.clusterId = clusterId;
+ this.appId = appId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
/**
* Constructs a row key prefix for the app_flow table as follows:
* {@code clusterId!AppId}
@@ -36,4 +52,19 @@ public class AppToFlowRowKey {
return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
}
+ /**
+ * Given the raw row key as bytes, returns the row key as an object.
+ */
+ public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+ if (rowKeyComponents.length < 2) {
+ throw new IllegalArgumentException("the row key is not valid for " +
+ "the app-to-flow table");
+ }
+
+ String clusterId = Bytes.toString(rowKeyComponents[0]);
+ String appId = Bytes.toString(rowKeyComponents[1]);
+ return new AppToFlowRowKey(clusterId, appId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index abba79a..9545438 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
/**
- * Implements behavior common to tables used in the timeline service storage.
+ * Implements behavior common to tables used in the timeline service storage. It
+ * is thread-safe, and can be used by multiple threads concurrently.
*
* @param <T> reference to the table instance class itself for type safety.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 9a72be0..6a534ed73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
@@ -26,9 +25,52 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
* Represents a rowkey for the entity table.
*/
public class EntityRowKey {
- // TODO: more methods are needed for this class.
+ private final String clusterId;
+ private final String userId;
+ private final String flowId;
+ private final long flowRunId;
+ private final String appId;
+ private final String entityType;
+ private final String entityId;
- // TODO: API needs to be cleaned up.
+ public EntityRowKey(String clusterId, String userId, String flowId,
+ long flowRunId, String appId, String entityType, String entityId) {
+ this.clusterId = clusterId;
+ this.userId = userId;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ this.appId = appId;
+ this.entityType = entityType;
+ this.entityId = entityId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public long getFlowRunId() {
+ return flowRunId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public String getEntityId() {
+ return entityId;
+ }
/**
* Constructs a row key prefix for the entity table as follows:
@@ -106,4 +148,32 @@ public class EntityRowKey {
return Separator.QUALIFIERS.join(first, second, third);
}
+ /**
+ * Given the raw row key as bytes, returns the row key as an object.
+ */
+ public static EntityRowKey parseRowKey(byte[] rowKey) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+ if (rowKeyComponents.length < 7) {
+ throw new IllegalArgumentException("the row key is not valid for " +
+ "an entity");
+ }
+
+ String userId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+ String clusterId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+ String flowId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+ long flowRunId =
+ TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+ String appId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+ String entityType =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
+ String entityId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[6]));
+ return new EntityRowKey(clusterId, userId, flowId, flowRunId, appId,
+ entityType, entityId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 19e4e83..18ca599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -55,6 +55,10 @@ public class FlowActivityRowKey {
return flowId;
}
+ public static byte[] getRowKeyPrefix(String clusterId) {
+ return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, ""));
+ }
+
/**
* Constructs a row key for the flow activity table as follows:
* {@code clusterId!dayTimestamp!user!flowId}
@@ -65,7 +69,8 @@ public class FlowActivityRowKey {
* @param flowId
* @return byte array with the row key prefix
*/
- public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+ public static byte[] getRowKey(String clusterId, String userId,
+ String flowId) {
long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
.currentTimeMillis());
return getRowKey(clusterId, dayTs, userId, flowId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index e133241..880d481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -25,7 +25,34 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWrit
* Represents a rowkey for the flow run table.
*/
public class FlowRunRowKey {
- // TODO: more methods are needed for this class like parse row key
+ private final String clusterId;
+ private final String userId;
+ private final String flowId;
+ private final long flowRunId;
+
+ public FlowRunRowKey(String clusterId, String userId, String flowId,
+ long flowRunId) {
+ this.clusterId = clusterId;
+ this.userId = userId;
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public long getFlowRunId() {
+ return flowRunId;
+ }
/**
* Constructs a row key for the entity table as follows: {
@@ -47,4 +74,25 @@ public class FlowRunRowKey {
return Separator.QUALIFIERS.join(first, second);
}
+ /**
+ * Given the raw row key as bytes, returns the row key as an object.
+ */
+ public static FlowRunRowKey parseRowKey(byte[] rowKey) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+ if (rowKeyComponents.length < 4) {
+ throw new IllegalArgumentException("the row key is not valid for " +
+ "a flow run");
+ }
+
+ String clusterId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0]));
+ String userId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[1]));
+ String flowId =
+ Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
+ long flowRunId =
+ TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+ return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index a1948aa..651bb3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -18,6 +18,15 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
@@ -33,15 +42,6 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
/**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
* table. Looks through the list of cells per row, checks their tags and does
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 3962341..01920b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -508,32 +508,28 @@ public class TestHBaseTimelineStorage {
private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
String flow, long runid, String appName, TimelineEntity te) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
-
- assertTrue(rowKeyComponents.length == 7);
- assertEquals(user, Bytes.toString(rowKeyComponents[0]));
- assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
- assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
- assertEquals(TimelineWriterUtils.invert(runid),
- Bytes.toLong(rowKeyComponents[3]));
- assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
- assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
- assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
+ EntityRowKey key = EntityRowKey.parseRowKey(rowKey);
+
+ assertEquals(user, key.getUserId());
+ assertEquals(cluster, key.getClusterId());
+ assertEquals(flow, key.getFlowId());
+ assertEquals(runid, key.getFlowRunId());
+ assertEquals(appName, key.getAppId());
+ assertEquals(te.getType(), key.getEntityType());
+ assertEquals(te.getId(), key.getEntityId());
return true;
}
private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster,
String user, String flow, long runid, String appName) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+ ApplicationRowKey key = ApplicationRowKey.parseRowKey(rowKey);
- assertTrue(rowKeyComponents.length == 5);
- assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
- assertEquals(user, Bytes.toString(rowKeyComponents[1]));
- assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
- assertEquals(TimelineWriterUtils.invert(runid),
- Bytes.toLong(rowKeyComponents[3]));
- assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+ assertEquals(cluster, key.getClusterId());
+ assertEquals(user, key.getUserId());
+ assertEquals(flow, key.getFlowId());
+ assertEquals(runid, key.getFlowRunId());
+ assertEquals(appName, key.getAppId());
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index f8331fa..d18613a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -45,7 +45,7 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- Long cTime = 1425016501000L;
+ long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
@@ -54,8 +54,8 @@ class TestFlowDataGenerator {
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
- metricValues.put(ts - 100000, 2);
- metricValues.put(ts - 80000, 40);
+ metricValues.put(ts - 100000, 2L);
+ metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES);
m1.setValues(metricValues);
metrics.add(m1);
@@ -64,8 +64,8 @@ class TestFlowDataGenerator {
m2.setId(metric2);
metricValues = new HashMap<Long, Number>();
ts = System.currentTimeMillis();
- metricValues.put(ts - 100000, 31);
- metricValues.put(ts - 80000, 57);
+ metricValues.put(ts - 100000, 31L);
+ metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES);
m2.setValues(metricValues);
metrics.add(m2);
@@ -80,7 +80,7 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- Long cTime = 1425016501000L;
+ long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
// add metrics
Set<TimelineMetric> metrics = new HashSet<>();
@@ -103,8 +103,8 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- Long cTime = 20000000000000L;
- Long mTime = 1425026901000L;
+ long cTime = 20000000000000L;
+ long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
entity.setModifiedTime(mTime);
// add metrics
@@ -113,10 +113,10 @@ class TestFlowDataGenerator {
m1.setId(metric1);
Map<Long, Number> metricValues = new HashMap<Long, Number>();
long ts = System.currentTimeMillis();
- metricValues.put(ts - 120000, 100000000);
- metricValues.put(ts - 100000, 200000000);
- metricValues.put(ts - 80000, 300000000);
- metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 120000, 100000000L);
+ metricValues.put(ts - 100000, 200000000L);
+ metricValues.put(ts - 80000, 300000000L);
+ metricValues.put(ts - 60000, 400000000L);
metricValues.put(ts - 40000, 50000000000L);
metricValues.put(ts - 20000, 60000000000L);
m1.setType(Type.TIME_SERIES);
@@ -126,7 +126,7 @@ class TestFlowDataGenerator {
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- Long expTs = 1436512802000L;
+ long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
@@ -142,9 +142,9 @@ class TestFlowDataGenerator {
return entity;
}
- static TimelineEntity getEntityGreaterStartTime() {
+ static TimelineEntity getEntityGreaterStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity();
- entity.setCreatedTime(30000000000000L);
+ entity.setCreatedTime(startTs);
entity.setId("flowRunHello with greater start time");
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setType(type);
@@ -173,14 +173,13 @@ class TestFlowDataGenerator {
return entity;
}
- static TimelineEntity getEntityMinStartTime() {
+ static TimelineEntity getEntityMinStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunHelloMInStartTime";
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- Long cTime = 10000000000000L;
- entity.setCreatedTime(cTime);
+ entity.setCreatedTime(startTs);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(System.currentTimeMillis());
@@ -195,12 +194,12 @@ class TestFlowDataGenerator {
String type = TimelineEntityType.YARN_APPLICATION.toString();
entity.setId(id);
entity.setType(type);
- Long cTime = 1425016501000L;
+ long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- Long expTs = 1436512802000L;
+ long expTs = 1436512802000L;
event.setTimestamp(expTs);
String expKey = "foo_event";
Object expVal = "test";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index b4a0c74..6bdec6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -21,19 +21,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -42,26 +39,17 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -119,11 +107,13 @@ public class TestHBaseStorageFlowActivity {
String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354";
- Long runid = 1002345678919L;
+ long runid = 1002345678919L;
String appName = "application_100000000000_1111";
+ long minStartTs = 10000000000000L;
+ long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator
- .getEntityMinStartTime();
+ .getEntityMinStartTime(minStartTs);
try {
hbi = new HBaseTimelineWriterImpl(c1);
@@ -146,7 +136,7 @@ public class TestHBaseStorageFlowActivity {
// writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
- .getEntityGreaterStartTime();
+ .getEntityGreaterStartTime(greaterStartTs);
te = new TimelineEntities();
te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222";
@@ -181,6 +171,31 @@ public class TestHBaseStorageFlowActivity {
assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
assertEquals(1, values.size());
checkFlowActivityRunId(runid, flowVersion, values);
+
+ // use the timeline reader to verify data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+ // get the flow activity entity
+ Set<TimelineEntity> entities =
+ hbr.getEntities(null, cluster, null, null, null,
+ TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+ null, null, null, null, null, null, null, null, null);
+ assertEquals(1, entities.size());
+ for (TimelineEntity e : entities) {
+ FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+ assertEquals(cluster, flowActivity.getCluster());
+ assertEquals(user, flowActivity.getUser());
+ assertEquals(flow, flowActivity.getFlowName());
+ assertEquals(dayTs, flowActivity.getDate().getTime());
+ Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+ assertEquals(1, flowRuns.size());
+ }
+ } finally {
+ hbr.close();
+ }
}
/**
@@ -193,7 +208,7 @@ public class TestHBaseStorageFlowActivity {
String user = "testWriteFlowActivityOneFlow_user1";
String flow = "flow_activity_test_flow_name";
String flowVersion = "A122110F135BC4";
- Long runid = 1001111178919L;
+ long runid = 1001111178919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
@@ -212,10 +227,35 @@ public class TestHBaseStorageFlowActivity {
}
// check flow activity
checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+
+ // use the reader to verify the data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+
+ Set<TimelineEntity> entities =
+ hbr.getEntities(user, cluster, flow, null, null,
+ TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+ null, null, null, null, null, null, null, null, null);
+ assertEquals(1, entities.size());
+ for (TimelineEntity e : entities) {
+ FlowActivityEntity entity = (FlowActivityEntity)e;
+ NavigableSet<FlowRunEntity> flowRuns = entity.getFlowRuns();
+ assertEquals(1, flowRuns.size());
+ for (FlowRunEntity flowRun : flowRuns) {
+ assertEquals(runid, flowRun.getRunId());
+ assertEquals(flowVersion, flowRun.getVersion());
+ }
+ }
+ } finally {
+ hbr.close();
+ }
}
private void checkFlowActivityTable(String cluster, String user, String flow,
- String flowVersion, Long runid, Configuration c1) throws IOException {
+ String flowVersion, long runid, Configuration c1) throws IOException {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
@@ -263,7 +303,7 @@ public class TestHBaseStorageFlowActivity {
String user = "testManyRunsFlowActivity_c_user1";
String flow = "flow_activity_test_flow_name";
String flowVersion1 = "A122110F135BC4";
- Long runid1 = 11111111111L;
+ long runid1 = 11111111111L;
String flowVersion2 = "A12222222222C4";
long runid2 = 2222222222222L;
@@ -303,11 +343,50 @@ public class TestHBaseStorageFlowActivity {
checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
runid1, flowVersion2, runid2, flowVersion3, runid3);
+ // use the timeline reader to verify data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+
+ Set<TimelineEntity> entities =
+ hbr.getEntities(null, cluster, null, null, null,
+ TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
+ null, null, null, null, null, null, null, null, null);
+ assertEquals(1, entities.size());
+ for (TimelineEntity e : entities) {
+ FlowActivityEntity flowActivity = (FlowActivityEntity)e;
+ assertEquals(cluster, flowActivity.getCluster());
+ assertEquals(user, flowActivity.getUser());
+ assertEquals(flow, flowActivity.getFlowName());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivity.getDate().getTime());
+ Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
+ assertEquals(3, flowRuns.size());
+ for (FlowRunEntity flowRun : flowRuns) {
+ long runId = flowRun.getRunId();
+ String version = flowRun.getVersion();
+ if (runId == runid1) {
+ assertEquals(flowVersion1, version);
+ } else if (runId == runid2) {
+ assertEquals(flowVersion2, version);
+ } else if (runId == runid3) {
+ assertEquals(flowVersion3, version);
+ } else {
+ fail("unknown run id: " + runId);
+ }
+ }
+ }
+ } finally {
+ hbr.close();
+ }
}
private void checkFlowActivityTableSeveralRuns(String cluster, String user,
- String flow, Configuration c1, String flowVersion1, Long runid1,
- String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+ String flow, Configuration c1, String flowVersion1, long runid1,
+ String flowVersion2, long runid2, String flowVersion3, long runid3)
throws IOException {
Scan s = new Scan();
s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
@@ -351,7 +430,7 @@ public class TestHBaseStorageFlowActivity {
assertEquals(1, rowCount);
}
- private void checkFlowActivityRunId(Long runid, String flowVersion,
+ private void checkFlowActivityRunId(long runid, String flowVersion,
Map<byte[], byte[]> values) throws IOException {
byte[] rq = ColumnHelper.getColumnQualifier(
FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index bf524ea..b0f83b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -21,20 +21,15 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -42,32 +37,16 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -125,11 +104,13 @@ public class TestHBaseStorageFlowRun {
String user = "testWriteFlowRunMinMaxToHBase_user1";
String flow = "testing_flowRun_flow_name";
String flowVersion = "CF7022C10F1354";
- Long runid = 1002345678919L;
+ long runid = 1002345678919L;
String appName = "application_100000000000_1111";
+ long minStartTs = 10000000000000L;
+ long greaterStartTs = 30000000000000L;
long endTs = 1439750690000L;
TimelineEntity entityMinStartTime = TestFlowDataGenerator
- .getEntityMinStartTime();
+ .getEntityMinStartTime(minStartTs);
try {
hbi = new HBaseTimelineWriterImpl(c1);
@@ -152,7 +133,7 @@ public class TestHBaseStorageFlowRun {
// writer another entity with greater start time
TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
- .getEntityGreaterStartTime();
+ .getEntityGreaterStartTime(greaterStartTs);
te = new TimelineEntities();
te.addEntity(entityGreaterStartTime);
appName = "application_1000000000000000_2222";
@@ -183,24 +164,29 @@ public class TestHBaseStorageFlowRun {
.getBytes());
assertEquals(2, r1.size());
- Long starttime = (Long) GenericObjectMapper.read(values
+ long starttime = (Long) GenericObjectMapper.read(values
.get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
- Long expmin = entityMinStartTime.getCreatedTime();
- assertEquals(expmin, starttime);
+ assertEquals(minStartTs, starttime);
assertEquals(endTs, GenericObjectMapper.read(values
.get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
- }
- boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
- String flow, Long runid) {
- byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
- assertTrue(rowKeyComponents.length == 4);
- assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
- assertEquals(user, Bytes.toString(rowKeyComponents[1]));
- assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
- assertEquals(TimelineWriterUtils.invert(runid),
- Bytes.toLong(rowKeyComponents[3]));
- return true;
+ // use the timeline reader to verify data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+ // get the flow run entity
+ TimelineEntity entity =
+ hbr.getEntity(user, cluster, flow, runid, null,
+ TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+ assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+ FlowRunEntity flowRun = (FlowRunEntity)entity;
+ assertEquals(minStartTs, flowRun.getStartTime());
+ assertEquals(endTs, flowRun.getMaxEndTime());
+ } finally {
+ hbr.close();
+ }
}
/**
@@ -218,7 +204,7 @@ public class TestHBaseStorageFlowRun {
String user = "testWriteFlowRunMetricsOneFlow_user1";
String flow = "testing_flowRun_metrics_flow_name";
String flowVersion = "CF7022C10F1354";
- Long runid = 1002345678919L;
+ long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
@@ -244,6 +230,41 @@ public class TestHBaseStorageFlowRun {
// check flow run
checkFlowRunTable(cluster, user, flow, runid, c1);
+
+ // use the timeline reader to verify data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+ TimelineEntity entity =
+ hbr.getEntity(user, cluster, flow, runid, null,
+ TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+ assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+ Set<TimelineMetric> metrics = entity.getMetrics();
+ assertEquals(2, metrics.size());
+ for (TimelineMetric metric : metrics) {
+ String id = metric.getId();
+ Map<Long, Number> values = metric.getValues();
+ assertEquals(1, values.size());
+ Number value = null;
+ for (Number n : values.values()) {
+ value = n;
+ }
+ switch (id) {
+ case metric1:
+ assertEquals(141, value);
+ break;
+ case metric2:
+ assertEquals(57, value);
+ break;
+ default:
+ fail("unrecognized metric: " + id);
+ }
+ }
+ } finally {
+ hbr.close();
+ }
}
private void checkFlowRunTable(String cluster, String user, String flow,
[2/2] hadoop git commit: YARN-4074. [timeline reader] implement
support for querying for flows and flow runs (sjlee via vrushali)
Posted by vr...@apache.org.
YARN-4074. [timeline reader] implement support for querying for flows and flow runs (sjlee via vrushali)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2e7e0f0b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2e7e0f0b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2e7e0f0b
Branch: refs/heads/YARN-2928
Commit: 2e7e0f0bbba4b3ab4280de06d595018bc5dec87b
Parents: 4b37985
Author: Vrushali <vr...@apache.org>
Authored: Tue Sep 22 13:42:30 2015 -0700
Committer: Vrushali <vr...@apache.org>
Committed: Tue Sep 22 13:42:30 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../timelineservice/FlowActivityEntity.java | 183 ++++++++
.../api/records/timelineservice/FlowEntity.java | 103 -----
.../records/timelineservice/FlowRunEntity.java | 121 ++++++
.../timelineservice/TimelineEntityType.java | 31 +-
.../TestTimelineServiceRecords.java | 14 +-
.../TestTimelineServiceClientIntegration.java | 2 +-
.../collector/TimelineCollectorWebService.java | 6 +-
.../storage/ApplicationEntityReader.java | 229 ++++++++++
.../storage/FlowActivityEntityReader.java | 168 +++++++
.../storage/FlowRunEntityReader.java | 136 ++++++
.../storage/GenericEntityReader.java | 389 +++++++++++++++++
.../storage/HBaseTimelineReaderImpl.java | 434 +------------------
.../storage/TimelineEntityReader.java | 223 ++++++++++
.../storage/TimelineEntityReaderFactory.java | 97 +++++
.../storage/application/ApplicationRowKey.java | 68 ++-
.../storage/apptoflow/AppToFlowRowKey.java | 31 ++
.../storage/common/BaseTable.java | 3 +-
.../storage/entity/EntityRowKey.java | 76 +++-
.../storage/flow/FlowActivityRowKey.java | 7 +-
.../storage/flow/FlowRunRowKey.java | 50 ++-
.../storage/flow/FlowScanner.java | 18 +-
.../storage/TestHBaseTimelineStorage.java | 34 +-
.../storage/flow/TestFlowDataGenerator.java | 39 +-
.../flow/TestHBaseStorageFlowActivity.java | 131 ++++--
.../storage/flow/TestHBaseStorageFlowRun.java | 105 +++--
26 files changed, 2021 insertions(+), 680 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cc0f014..114b2fe 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -106,6 +106,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3901. Populate flow run data in the flow_run & flow activity tables
(Vrushali C via sjlee)
+ YARN-4074. [timeline reader] implement support for querying for flows
+ and flow runs (sjlee via vrushali)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
new file mode 100644
index 0000000..163bd5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowActivityEntity.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Entity that represents a record for flow activity. It's essentially a
+ * container entity for flow runs with limited information.
+ */
+@Public
+@Unstable
+public class FlowActivityEntity extends TimelineEntity {
+ public static final String CLUSTER_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "CLUSTER";
+ public static final String DATE_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "DATE";
+ public static final String USER_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
+ public static final String FLOW_NAME_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
+
+ private final NavigableSet<FlowRunEntity> flowRuns = new TreeSet<>();
+
+ public FlowActivityEntity() {
+ super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString());
+ // set config to null
+ setConfigs(null);
+ }
+
+ public FlowActivityEntity(String cluster, long time, String user,
+ String flowName) {
+ this();
+ setCluster(cluster);
+ setDate(time);
+ setUser(user);
+ setFlowName(flowName);
+ }
+
+ public FlowActivityEntity(TimelineEntity entity) {
+ super(entity);
+ if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) {
+ throw new IllegalArgumentException("Incompatible entity type: " +
+ getId());
+ }
+ // set config to null
+ setConfigs(null);
+ }
+
+ @XmlElement(name = "id")
+ @Override
+ public String getId() {
+ // flow activity: cluster/day/user@flow_name
+ String id = super.getId();
+ if (id == null) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getCluster());
+ sb.append('/');
+ sb.append(getDate().getTime());
+ sb.append('/');
+ sb.append(getUser());
+ sb.append('@');
+ sb.append(getFlowName());
+ id = sb.toString();
+ setId(id);
+ }
+ return id;
+ }
+
+ @Override
+ public int compareTo(TimelineEntity entity) {
+ int comparison = getType().compareTo(entity.getType());
+ if (comparison == 0) {
+ // order by cluster, date (descending), user, and flow name
+ FlowActivityEntity other = (FlowActivityEntity)entity;
+ int clusterComparison = getCluster().compareTo(other.getCluster());
+ if (clusterComparison != 0) {
+ return clusterComparison;
+ }
+ int dateComparisonDescending =
+ (int)(other.getDate().getTime() - getDate().getTime()); // descending
+ if (dateComparisonDescending != 0) {
+ return dateComparisonDescending; // descending
+ }
+ int userComparison = getUser().compareTo(other.getUser());
+ if (userComparison != 0) {
+ return userComparison;
+ }
+ return getFlowName().compareTo(other.getFlowName());
+ } else {
+ return comparison;
+ }
+ }
+
+ /**
+ * Reuse the base class equals method.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ return super.equals(obj);
+ }
+
+ /**
+ * Reuse the base class hashCode method.
+ */
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ public String getCluster() {
+ return (String)getInfo().get(CLUSTER_INFO_KEY);
+ }
+
+ public void setCluster(String cluster) {
+ addInfo(CLUSTER_INFO_KEY, cluster);
+ }
+
+ public Date getDate() {
+ return (Date)getInfo().get(DATE_INFO_KEY);
+ }
+
+ public void setDate(long time) {
+ Date date = new Date(time);
+ addInfo(DATE_INFO_KEY, date);
+ }
+
+ public String getUser() {
+ return (String)getInfo().get(USER_INFO_KEY);
+ }
+
+ public void setUser(String user) {
+ addInfo(USER_INFO_KEY, user);
+ }
+
+ public String getFlowName() {
+ return (String)getInfo().get(FLOW_NAME_INFO_KEY);
+ }
+
+ public void setFlowName(String flowName) {
+ addInfo(FLOW_NAME_INFO_KEY, flowName);
+ }
+
+ public void addFlowRun(FlowRunEntity run) {
+ flowRuns.add(run);
+ }
+
+ public void addFlowRuns(Collection<FlowRunEntity> runs) {
+ flowRuns.addAll(runs);
+ }
+
+ @XmlElement(name = "flowruns")
+ public NavigableSet<FlowRunEntity> getFlowRuns() {
+ return flowRuns;
+ }
+
+ public int getNumberOfRuns() {
+ return flowRuns.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
deleted file mode 100644
index 4554778..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.api.records.timelineservice;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlElement;
-
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class FlowEntity extends HierarchicalTimelineEntity {
- public static final String USER_INFO_KEY =
- TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
- public static final String FLOW_NAME_INFO_KEY =
- TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
- public static final String FLOW_VERSION_INFO_KEY =
- TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION";
- public static final String FLOW_RUN_ID_INFO_KEY =
- TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID";
-
- public FlowEntity() {
- super(TimelineEntityType.YARN_FLOW.toString());
- }
-
- public FlowEntity(TimelineEntity entity) {
- super(entity);
- if (!entity.getType().equals(TimelineEntityType.YARN_FLOW.toString())) {
- throw new IllegalArgumentException("Incompatible entity type: " + getId());
- }
- }
-
- @XmlElement(name = "id")
- @Override
- public String getId() {
- //Flow id schema: user@flow_name(or id)/version/run_id
- String id = super.getId();
- if (id == null) {
- StringBuilder sb = new StringBuilder();
- sb.append(getInfo().get(USER_INFO_KEY).toString());
- sb.append('@');
- sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
- sb.append('/');
- sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString());
- sb.append('/');
- sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
- id = sb.toString();
- setId(id);
- }
- return id;
- }
-
- public String getUser() {
- Object user = getInfo().get(USER_INFO_KEY);
- return user == null ? null : user.toString();
- }
-
- public void setUser(String user) {
- addInfo(USER_INFO_KEY, user);
- }
-
- public String getName() {
- Object name = getInfo().get(FLOW_NAME_INFO_KEY);
- return name == null ? null : name.toString();
- }
-
- public void setName(String name) {
- addInfo(FLOW_NAME_INFO_KEY, name);
- }
-
- public String getVersion() {
- Object version = getInfo().get(FLOW_VERSION_INFO_KEY);
- return version == null ? null : version.toString();
- }
-
- public void setVersion(String version) {
- addInfo(FLOW_VERSION_INFO_KEY, version);
- }
-
- public long getRunId() {
- Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY);
- return runId == null ? 0L : (Long) runId;
- }
-
- public void setRunId(long runId) {
- addInfo(FLOW_RUN_ID_INFO_KEY, runId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
new file mode 100644
index 0000000..3c3ffb4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowRunEntity.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class FlowRunEntity extends HierarchicalTimelineEntity {
+ public static final String USER_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER";
+ public static final String FLOW_NAME_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME";
+ public static final String FLOW_VERSION_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION";
+ public static final String FLOW_RUN_ID_INFO_KEY =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID";
+ public static final String FLOW_RUN_END_TIME =
+ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME";
+
+ public FlowRunEntity() {
+ super(TimelineEntityType.YARN_FLOW_RUN.toString());
+ // set config to null
+ setConfigs(null);
+ }
+
+ public FlowRunEntity(TimelineEntity entity) {
+ super(entity);
+ if (!entity.getType().equals(TimelineEntityType.YARN_FLOW_RUN.toString())) {
+ throw new IllegalArgumentException("Incompatible entity type: " + getId());
+ }
+ // set config to null
+ setConfigs(null);
+ }
+
+ @XmlElement(name = "id")
+ @Override
+ public String getId() {
+ //Flow id schema: user@flow_name(or id)/run_id
+ String id = super.getId();
+ if (id == null) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(getInfo().get(USER_INFO_KEY).toString());
+ sb.append('@');
+ sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString());
+ sb.append('/');
+ sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString());
+ id = sb.toString();
+ setId(id);
+ }
+ return id;
+ }
+
+ public String getUser() {
+ return (String)getInfo().get(USER_INFO_KEY);
+ }
+
+ public void setUser(String user) {
+ addInfo(USER_INFO_KEY, user);
+ }
+
+ public String getName() {
+ return (String)getInfo().get(FLOW_NAME_INFO_KEY);
+ }
+
+ public void setName(String name) {
+ addInfo(FLOW_NAME_INFO_KEY, name);
+ }
+
+ public String getVersion() {
+ return (String)getInfo().get(FLOW_VERSION_INFO_KEY);
+ }
+
+ public void setVersion(String version) {
+ addInfo(FLOW_VERSION_INFO_KEY, version);
+ }
+
+ public long getRunId() {
+ Object runId = getInfo().get(FLOW_RUN_ID_INFO_KEY);
+ return runId == null ? 0L : (Long) runId;
+ }
+
+ public void setRunId(long runId) {
+ addInfo(FLOW_RUN_ID_INFO_KEY, runId);
+ }
+
+ public long getStartTime() {
+ return getCreatedTime();
+ }
+
+ public void setStartTime(long startTime) {
+ setCreatedTime(startTime);
+ }
+
+ public long getMaxEndTime() {
+ Object time = getInfo().get(FLOW_RUN_END_TIME);
+ return time == null ? 0L : (Long)time;
+ }
+
+ public void setMaxEndTime(long endTime) {
+ addInfo(FLOW_RUN_END_TIME, endTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
index 6062fe1..ba32e20 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
@@ -24,21 +24,25 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public enum TimelineEntityType {
YARN_CLUSTER,
- YARN_FLOW,
+ YARN_FLOW_RUN,
YARN_APPLICATION,
YARN_APPLICATION_ATTEMPT,
YARN_CONTAINER,
YARN_USER,
- YARN_QUEUE;
+ YARN_QUEUE,
+ YARN_FLOW_ACTIVITY;
+ /**
+ * Whether the input type can be a parent of this entity.
+ */
public boolean isParent(TimelineEntityType type) {
switch (this) {
case YARN_CLUSTER:
return false;
- case YARN_FLOW:
- return YARN_FLOW == type || YARN_CLUSTER == type;
+ case YARN_FLOW_RUN:
+ return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION:
- return YARN_FLOW == type || YARN_CLUSTER == type;
+ return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
case YARN_APPLICATION_ATTEMPT:
return YARN_APPLICATION == type;
case YARN_CONTAINER:
@@ -50,12 +54,15 @@ public enum TimelineEntityType {
}
}
+ /**
+ * Whether the input type can be a child of this entity.
+ */
public boolean isChild(TimelineEntityType type) {
switch (this) {
case YARN_CLUSTER:
- return YARN_FLOW == type || YARN_APPLICATION == type;
- case YARN_FLOW:
- return YARN_FLOW == type || YARN_APPLICATION == type;
+ return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
+ case YARN_FLOW_RUN:
+ return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
case YARN_APPLICATION:
return YARN_APPLICATION_ATTEMPT == type;
case YARN_APPLICATION_ATTEMPT:
@@ -68,4 +75,12 @@ public enum TimelineEntityType {
return false;
}
}
+
+ /**
+ * Whether the type of this entity matches the type indicated by the input
+ * argument.
+ */
+ public boolean matches(String typeString) {
+ return toString().equals(typeString);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 78943e0..7c9acf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -182,14 +182,14 @@ public class TestTimelineServiceRecords {
ClusterEntity cluster = new ClusterEntity();
cluster.setId("test cluster id");
- FlowEntity flow1 = new FlowEntity();
+ FlowRunEntity flow1 = new FlowRunEntity();
//flow1.setId("test flow id 1");
flow1.setUser(user.getId());
flow1.setName("test flow name 1");
flow1.setVersion("test flow version 1");
flow1.setRunId(1L);
- FlowEntity flow2 = new FlowEntity();
+ FlowRunEntity flow2 = new FlowRunEntity();
//flow2.setId("test flow run id 2");
flow2.setUser(user.getId());
flow2.setName("test flow name 2");
@@ -213,19 +213,19 @@ public class TestTimelineServiceRecords {
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1).toString());
- cluster.addChild(TimelineEntityType.YARN_FLOW.toString(), flow1.getId());
+ cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
flow1
.setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId());
- flow1.addChild(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
- flow2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow1.getId());
+ flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
+ flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(), app2.getId());
- app1.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
+ app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
appAttempt.getId());
appAttempt
.setParent(TimelineEntityType.YARN_APPLICATION.toString(), app1.getId());
- app2.setParent(TimelineEntityType.YARN_FLOW.toString(), flow2.getId());
+ app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(),
container.getId());
container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 69031a2..5672759 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -107,7 +107,7 @@ public class TestTimelineServiceClientIntegration {
client.start();
ClusterEntity cluster = new ClusterEntity();
cluster.setId(YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
- FlowEntity flow = new FlowEntity();
+ FlowRunEntity flow = new FlowRunEntity();
flow.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
flow.setName("test_flow_name");
flow.setVersion("test_flow_version");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
index 42fa365..8f595e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationAttemptEnti
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ClusterEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@@ -205,8 +205,8 @@ public class TimelineCollectorWebService {
case YARN_CLUSTER:
entitiesToReturn.addEntity(new ClusterEntity(entity));
break;
- case YARN_FLOW:
- entitiesToReturn.addEntity(new FlowEntity(entity));
+ case YARN_FLOW_RUN:
+ entitiesToReturn.addEntity(new FlowRunEntity(entity));
break;
case YARN_APPLICATION:
entitiesToReturn.addEntity(new ApplicationEntity(entity));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
new file mode 100644
index 0000000..dfbc31d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+
+/**
+ * Timeline entity reader for application entities that are stored in the
+ * application table.
+ */
+class ApplicationEntityReader extends GenericEntityReader {
+ private static final ApplicationTable APPLICATION_TABLE =
+ new ApplicationTable();
+
+ public ApplicationEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+ createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+ relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+ eventFilters, fieldsToRetrieve);
+ }
+
+ public ApplicationEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+ fieldsToRetrieve);
+ }
+
+ /**
+ * Uses the {@link ApplicationTable}.
+ */
+ protected BaseTable<?> getTable() {
+ return APPLICATION_TABLE;
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ byte[] rowKey =
+ ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
+ appId);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ return table.getResult(hbaseConf, conn, get);
+ }
+
+ @Override
+ protected Iterable<Result> getResults(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ // If getEntities() is called for an application, there can be at most
+ // one entity. If the entity passes the filter, it is returned. Otherwise,
+ // an empty set is returned.
+ byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
+ flowRunId, appId);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Result result = table.getResult(hbaseConf, conn, get);
+ TimelineEntity entity = parseEntity(result);
+ Set<Result> set;
+ if (entity != null) {
+ set = Collections.singleton(result);
+ } else {
+ set = Collections.emptySet();
+ }
+ return set;
+ }
+
+ @Override
+ protected TimelineEntity parseEntity(Result result) throws IOException {
+ if (result == null || result.isEmpty()) {
+ return null;
+ }
+ TimelineEntity entity = new TimelineEntity();
+ entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ String entityId = ApplicationColumn.ID.readResult(result).toString();
+ entity.setId(entityId);
+
+ // fetch created time
+ Number createdTime =
+ (Number)ApplicationColumn.CREATED_TIME.readResult(result);
+ entity.setCreatedTime(createdTime.longValue());
+ if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+ entity.getCreatedTime() > createdTimeEnd)) {
+ return null;
+ }
+
+ // fetch modified time
+ Number modifiedTime =
+ (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
+ entity.setModifiedTime(modifiedTime.longValue());
+ if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+ entity.getModifiedTime() > modifiedTimeEnd)) {
+ return null;
+ }
+
+ // fetch is related to entities
+ boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+ readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
+ true);
+ if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+ entity.getIsRelatedToEntities(), isRelatedTo)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+ entity.getIsRelatedToEntities().clear();
+ }
+ }
+
+ // fetch relates to entities
+ boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+ readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
+ false);
+ if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+ entity.getRelatesToEntities(), relatesTo)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+ entity.getRelatesToEntities().clear();
+ }
+ }
+
+ // fetch info
+ boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+ readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
+ if (checkInfo &&
+ !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.INFO)) {
+ entity.getInfo().clear();
+ }
+ }
+
+ // fetch configs
+ boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+ readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
+ if (checkConfigs && !TimelineReaderUtils.matchFilters(
+ entity.getConfigs(), configFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.CONFIGS)) {
+ entity.getConfigs().clear();
+ }
+ }
+
+ // fetch events
+ boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+ readEvents(entity, result, true);
+ if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+ entity.getEvents(), eventFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.EVENTS)) {
+ entity.getEvents().clear();
+ }
+ }
+
+ // fetch metrics
+ boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+ readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
+ if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+ entity.getMetrics(), metricFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.METRICS)) {
+ entity.getMetrics().clear();
+ }
+ }
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
new file mode 100644
index 0000000..d5ece2e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow activity entities that are stored in the
+ * flow activity table.
+ */
+class FlowActivityEntityReader extends TimelineEntityReader {
+ private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
+ new FlowActivityTable();
+
+ public FlowActivityEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+ createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+ relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+ eventFilters, fieldsToRetrieve);
+ }
+
+ public FlowActivityEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+ fieldsToRetrieve);
+ }
+
+ /**
+ * Uses the {@link FlowActivityTable}.
+ */
+ @Override
+ protected BaseTable<?> getTable() {
+ return FLOW_ACTIVITY_TABLE;
+ }
+
+ /**
+ * Since this is strictly sorted by the row key, it is sufficient to collect
+ * the first results as specified by the limit.
+ */
+ @Override
+ public Set<TimelineEntity> readEntities(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ validateParams();
+ augmentParams(hbaseConf, conn);
+
+ NavigableSet<TimelineEntity> entities = new TreeSet<>();
+ Iterable<Result> results = getResults(hbaseConf, conn);
+ for (Result result : results) {
+ TimelineEntity entity = parseEntity(result);
+ if (entity == null) {
+ continue;
+ }
+ entities.add(entity);
+ if (entities.size() == limit) {
+ break;
+ }
+ }
+ return entities;
+ }
+
+ @Override
+ protected void validateParams() {
+ Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+ }
+
+ @Override
+ protected void augmentParams(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ if (limit == null || limit < 0) {
+ limit = TimelineReader.DEFAULT_LIMIT;
+ }
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ throw new UnsupportedOperationException(
+ "we don't support a single entity query");
+ }
+
+ @Override
+ protected Iterable<Result> getResults(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ Scan scan = new Scan();
+ scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
+ // use the page filter to limit the result to the page size
+ // the scanner may still return more than the limit; therefore we need to
+ // read the right number as we iterate
+ scan.setFilter(new PageFilter(limit));
+ return table.getResultScanner(hbaseConf, conn, scan);
+ }
+
+ @Override
+ protected TimelineEntity parseEntity(Result result) throws IOException {
+ FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
+
+ long time = rowKey.getDayTimestamp();
+ String user = rowKey.getUserId();
+ String flowName = rowKey.getFlowId();
+
+ FlowActivityEntity flowActivity =
+ new FlowActivityEntity(clusterId, time, user, flowName);
+ // set the id
+ flowActivity.setId(flowActivity.getId());
+ // get the list of run ids along with the version that are associated with
+ // this flow on this day
+ Map<String, Object> runIdsMap =
+ FlowActivityColumnPrefix.RUN_ID.readResults(result);
+ for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
+ Long runId = Long.valueOf(e.getKey());
+ String version = (String)e.getValue();
+ FlowRunEntity flowRun = new FlowRunEntity();
+ flowRun.setUser(user);
+ flowRun.setName(flowName);
+ flowRun.setRunId(runId);
+ flowRun.setVersion(version);
+ // set the id
+ flowRun.setId(flowRun.getId());
+ flowActivity.addFlowRun(flowRun);
+ }
+
+ return flowActivity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
new file mode 100644
index 0000000..ced795d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for flow run entities that are stored in the flow run
+ * table.
+ */
+class FlowRunEntityReader extends TimelineEntityReader {
+ private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
+
+ public FlowRunEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+ createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+ relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+ eventFilters, fieldsToRetrieve);
+ }
+
+ public FlowRunEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+ fieldsToRetrieve);
+ }
+
+ /**
+ * Uses the {@link FlowRunTable}.
+ */
+ @Override
+ protected BaseTable<?> getTable() {
+ return FLOW_RUN_TABLE;
+ }
+
+ @Override
+ protected void validateParams() {
+ Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+ Preconditions.checkNotNull(userId, "userId shouldn't be null");
+ Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
+ Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
+ }
+
+ @Override
+ protected void augmentParams(Configuration hbaseConf, Connection conn) {
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ byte[] rowKey =
+ FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ return table.getResult(hbaseConf, conn, get);
+ }
+
+ @Override
+ protected Iterable<Result> getResults(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ throw new UnsupportedOperationException(
+ "multiple entity query is not supported");
+ }
+
+ @Override
+ protected TimelineEntity parseEntity(Result result) throws IOException {
+ FlowRunEntity flowRun = new FlowRunEntity();
+ flowRun.setUser(userId);
+ flowRun.setName(flowId);
+ flowRun.setRunId(flowRunId);
+
+ // read the start time
+ Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
+ if (startTime != null) {
+ flowRun.setStartTime(startTime);
+ }
+ // read the end time if available
+ Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
+ if (endTime != null) {
+ flowRun.setMaxEndTime(endTime);
+ }
+
+ // read the flow version
+ String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
+ if (version != null) {
+ flowRun.setVersion(version);
+ }
+
+ // read metrics
+ readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
+
+ // set the id
+ flowRun.setId(flowRun.getId());
+ return flowRun;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2e7e0f0b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
new file mode 100644
index 0000000..466914b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Timeline entity reader for generic entities that are stored in the entity
+ * table.
+ */
+class GenericEntityReader extends TimelineEntityReader {
+ private static final EntityTable ENTITY_TABLE = new EntityTable();
+ private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
+
+ private static final long DEFAULT_BEGIN_TIME = 0L;
+ private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+ /**
+ * Used to look up the flow context.
+ */
+ private final AppToFlowTable appToFlowTable = new AppToFlowTable();
+
+ public GenericEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
+ createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
+ relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
+ eventFilters, fieldsToRetrieve);
+ }
+
+ public GenericEntityReader(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve) {
+ super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
+ fieldsToRetrieve);
+ }
+
+ /**
+ * Uses the {@link EntityTable}.
+ */
+ protected BaseTable<?> getTable() {
+ return ENTITY_TABLE;
+ }
+
+ private FlowContext lookupFlowContext(String clusterId, String appId,
+ Configuration hbaseConf, Connection conn) throws IOException {
+ byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+ Get get = new Get(rowKey);
+ Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+ if (result != null && !result.isEmpty()) {
+ return new FlowContext(
+ AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+ ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+ } else {
+ throw new IOException(
+ "Unable to find the context flow ID and flow run ID for clusterId=" +
+ clusterId + ", appId=" + appId);
+ }
+ }
+
+ private static class FlowContext {
+ private final String flowId;
+ private final Long flowRunId;
+ public FlowContext(String flowId, Long flowRunId) {
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ }
+ }
+
+ @Override
+ protected void validateParams() {
+ Preconditions.checkNotNull(userId, "userId shouldn't be null");
+ Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+ Preconditions.checkNotNull(appId, "appId shouldn't be null");
+ Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+ if (singleEntityRead) {
+ Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+ }
+ }
+
+ @Override
+ protected void augmentParams(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ // In reality both should be null or neither should be null
+ if (flowId == null || flowRunId == null) {
+ FlowContext context =
+ lookupFlowContext(clusterId, appId, hbaseConf, conn);
+ flowId = context.flowId;
+ flowRunId = context.flowRunId;
+ }
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.noneOf(Field.class);
+ }
+ if (!singleEntityRead) {
+ if (limit == null || limit < 0) {
+ limit = TimelineReader.DEFAULT_LIMIT;
+ }
+ if (createdTimeBegin == null) {
+ createdTimeBegin = DEFAULT_BEGIN_TIME;
+ }
+ if (createdTimeEnd == null) {
+ createdTimeEnd = DEFAULT_END_TIME;
+ }
+ if (modifiedTimeBegin == null) {
+ modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+ }
+ if (modifiedTimeEnd == null) {
+ modifiedTimeEnd = DEFAULT_END_TIME;
+ }
+ }
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ byte[] rowKey =
+ EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
+ entityType, entityId);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ return table.getResult(hbaseConf, conn, get);
+ }
+
+ @Override
+ protected Iterable<Result> getResults(Configuration hbaseConf,
+ Connection conn) throws IOException {
+ // Scan through part of the table to find the entities belong to one app
+ // and one type
+ Scan scan = new Scan();
+ scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+ clusterId, userId, flowId, flowRunId, appId, entityType));
+ scan.setMaxVersions(Integer.MAX_VALUE);
+ return table.getResultScanner(hbaseConf, conn, scan);
+ }
+
+ @Override
+ protected TimelineEntity parseEntity(Result result) throws IOException {
+ if (result == null || result.isEmpty()) {
+ return null;
+ }
+ TimelineEntity entity = new TimelineEntity();
+ String entityType = EntityColumn.TYPE.readResult(result).toString();
+ entity.setType(entityType);
+ String entityId = EntityColumn.ID.readResult(result).toString();
+ entity.setId(entityId);
+
+ // fetch created time
+ Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
+ entity.setCreatedTime(createdTime.longValue());
+ if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
+ entity.getCreatedTime() > createdTimeEnd)) {
+ return null;
+ }
+
+ // fetch modified time
+ Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
+ entity.setModifiedTime(modifiedTime.longValue());
+ if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
+ entity.getModifiedTime() > modifiedTimeEnd)) {
+ return null;
+ }
+
+ // fetch is related to entities
+ boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+ readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
+ if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+ entity.getIsRelatedToEntities(), isRelatedTo)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+ entity.getIsRelatedToEntities().clear();
+ }
+ }
+
+ // fetch relates to entities
+ boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+ readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
+ if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+ entity.getRelatesToEntities(), relatesTo)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+ entity.getRelatesToEntities().clear();
+ }
+ }
+
+ // fetch info
+ boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+ readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
+ if (checkInfo &&
+ !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.INFO)) {
+ entity.getInfo().clear();
+ }
+ }
+
+ // fetch configs
+ boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+ readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
+ if (checkConfigs && !TimelineReaderUtils.matchFilters(
+ entity.getConfigs(), configFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.CONFIGS)) {
+ entity.getConfigs().clear();
+ }
+ }
+
+ // fetch events
+ boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+ readEvents(entity, result, false);
+ if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+ entity.getEvents(), eventFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.EVENTS)) {
+ entity.getEvents().clear();
+ }
+ }
+
+ // fetch metrics
+ boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+ readMetrics(entity, result, EntityColumnPrefix.METRIC);
+ if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+ entity.getMetrics(), metricFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.METRICS)) {
+ entity.getMetrics().clear();
+ }
+ }
+ return entity;
+ }
+
+ /**
+ * Helper method for reading relationship.
+ */
+ protected <T> void readRelationship(
+ TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+ boolean isRelatedTo) throws IOException {
+ // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+ Map<String, Object> columns = prefix.readResults(result);
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ for (String id : Separator.VALUES.splitEncoded(
+ column.getValue().toString())) {
+ if (isRelatedTo) {
+ entity.addIsRelatedToEntity(column.getKey(), id);
+ } else {
+ entity.addRelatesToEntity(column.getKey(), id);
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method for reading key-value pairs for either info or config.
+ */
+ protected <T> void readKeyValuePairs(
+ TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
+ boolean isConfig) throws IOException {
+ // info and configuration are of type Map<String, Object or String>
+ Map<String, Object> columns = prefix.readResults(result);
+ if (isConfig) {
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ entity.addConfig(column.getKey(), column.getValue().toString());
+ }
+ } else {
+ entity.addInfo(columns);
+ }
+ }
+
+ /**
+ * Read events from the entity table or the application table. The column name
+ * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
+ * if there is no info associated with the event.
+ *
+ * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
+ * schema description.
+ */
+ protected void readEvents(TimelineEntity entity, Result result,
+ boolean isApplication) throws IOException {
+ Map<String, TimelineEvent> eventsMap = new HashMap<>();
+ Map<?, Object> eventsResult = isApplication ?
+ ApplicationColumnPrefix.EVENT.
+ readResultsHavingCompoundColumnQualifiers(result) :
+ EntityColumnPrefix.EVENT.
+ readResultsHavingCompoundColumnQualifiers(result);
+ for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
+ byte[][] karr = (byte[][])eventResult.getKey();
+ // the column name is of the form "eventId=timestamp=infoKey"
+ if (karr.length == 3) {
+ String id = Bytes.toString(karr[0]);
+ long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+ String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ event = new TimelineEvent();
+ event.setId(id);
+ event.setTimestamp(ts);
+ eventsMap.put(key, event);
+ }
+ // handle empty info
+ String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
+ if (infoKey != null) {
+ event.addInfo(infoKey, eventResult.getValue());
+ }
+ } else {
+ LOG.warn("incorrectly formatted column name: it will be discarded");
+ continue;
+ }
+ }
+ Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+ entity.addEvents(eventsSet);
+ }
+}