You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:42 UTC
[25/50] [abbrv] hadoop git commit: YARN-3864. Implement support for
querying single app and all apps for a flow run (Varun Saxena via sjlee)
YARN-3864. Implement support for querying single app and all apps for a flow run (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2b6784ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2b6784ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2b6784ab
Branch: refs/heads/feature-YARN-2928
Commit: 2b6784ab9585dc8dea97b0c99b51a64a4fee5c8d
Parents: 0839cd1
Author: Sangjin Lee <sj...@apache.org>
Authored: Mon Oct 5 13:14:11 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:58 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reader/TimelineReaderWebServices.java | 206 +++++-
.../storage/ApplicationEntityReader.java | 64 +-
.../storage/FlowActivityEntityReader.java | 33 +-
.../storage/FlowRunEntityReader.java | 2 +-
.../storage/GenericEntityReader.java | 16 +-
.../storage/TimelineEntityReader.java | 21 +-
.../storage/TimelineEntityReaderFactory.java | 2 +-
.../storage/application/ApplicationRowKey.java | 34 +
.../TestTimelineReaderWebServicesFlowRun.java | 405 -----------
...stTimelineReaderWebServicesHBaseStorage.java | 673 +++++++++++++++++++
11 files changed, 1000 insertions(+), 459 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0d2537e..902d05e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -118,6 +118,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4210. HBase reader throws NPE if Get returns no rows (Varun Saxena
via vrushali)
+ YARN-3864. Implement support for querying single app and all apps for a
+ flow run (Varun Saxena via sjlee)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index a327099..610f74c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -286,7 +286,7 @@ public class TimelineReaderWebServices {
@QueryParam("eventfilters") String eventfilters,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
- (null == req.getQueryString() ? "" :
+ (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
UserGroupInformation callerUGI = getUser(req);
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -310,7 +310,7 @@ public class TimelineReaderWebServices {
parseFieldsStr(fields, COMMA_DELIMITER));
} catch (Exception e) {
handleException(e, url, startTime,
- "createdTime or modifiedTime start/end or limit or flowId");
+ "createdTime or modifiedTime start/end or limit or flowrunid");
}
long endTime = Time.monotonicNow();
if (entities == null) {
@@ -360,7 +360,7 @@ public class TimelineReaderWebServices {
@QueryParam("flowrunid") String flowRunId,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
- (null == req.getQueryString() ? "" :
+ (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
UserGroupInformation callerUGI = getUser(req);
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -420,7 +420,7 @@ public class TimelineReaderWebServices {
@QueryParam("userid") String userId,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
- (null == req.getQueryString() ? "" :
+ (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
UserGroupInformation callerUGI = getUser(req);
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -477,7 +477,7 @@ public class TimelineReaderWebServices {
@QueryParam("limit") String limit,
@QueryParam("fields") String fields) {
String url = req.getRequestURI() +
- (null == req.getQueryString() ? "" :
+ (req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
UserGroupInformation callerUGI = getUser(req);
LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
@@ -502,4 +502,200 @@ public class TimelineReaderWebServices {
" (Took " + (endTime - startTime) + " ms.)");
return entities;
}
+
+ /**
+ * Return a single app for given app id. Cluster ID is not provided by
+ * client so default cluster ID has to be taken.
+ */
+ @GET
+ @Path("/app/{appid}/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public TimelineEntity getApp(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("appid") String appId,
+ @QueryParam("flowid") String flowId,
+ @QueryParam("flowrunid") String flowRunId,
+ @QueryParam("userid") String userId,
+ @QueryParam("fields") String fields) {
+ return getApp(req, res, null, appId, flowId, flowRunId, userId, fields);
+ }
+
+ /**
+ * Return a single app for given cluster id and app id.
+ */
+ @GET
+ @Path("/app/{clusterid}/{appid}/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public TimelineEntity getApp(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("clusterid") String clusterId,
+ @PathParam("appid") String appId,
+ @QueryParam("flowid") String flowId,
+ @QueryParam("flowrunid") String flowRunId,
+ @QueryParam("userid") String userId,
+ @QueryParam("fields") String fields) {
+ String url = req.getRequestURI() +
+ (req.getQueryString() == null ? "" :
+ QUERY_STRING_SEP + req.getQueryString());
+ UserGroupInformation callerUGI = getUser(req);
+ LOG.info("Received URL " + url + " from user " + getUserName(callerUGI));
+ long startTime = Time.monotonicNow();
+ init(res);
+ TimelineReaderManager timelineReaderManager = getTimelineReaderManager();
+ TimelineEntity entity = null;
+ try {
+ entity = timelineReaderManager.getEntity(
+ parseUser(callerUGI, userId), parseStr(clusterId),
+ parseStr(flowId), parseLongStr(flowRunId), parseStr(appId),
+ TimelineEntityType.YARN_APPLICATION.toString(), null,
+ parseFieldsStr(fields, COMMA_DELIMITER));
+ } catch (Exception e) {
+ handleException(e, url, startTime, "flowrunid");
+ }
+ long endTime = Time.monotonicNow();
+ if (entity == null) {
+ LOG.info("Processed URL " + url + " but app not found" + " (Took " +
+ (endTime - startTime) + " ms.)");
+ throw new NotFoundException("App " + appId + " not found");
+ }
+ LOG.info("Processed URL " + url +
+ " (Took " + (endTime - startTime) + " ms.)");
+ return entity;
+ }
+
+ /**
+ * Return a list of apps for given flow id and flow run id. Cluster ID is not
+ * provided by client so default cluster ID has to be taken. If number of
+ * matching apps are more than the limit, most recent apps till the limit is
+ * reached, will be returned.
+ */
+ @GET
+ @Path("/flowrunapps/{flowid}/{flowrunid}/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Set<TimelineEntity> getFlowRunApps(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("flowid") String flowId,
+ @PathParam("flowrunid") String flowRunId,
+ @QueryParam("userid") String userId,
+ @QueryParam("limit") String limit,
+ @QueryParam("createdtimestart") String createdTimeStart,
+ @QueryParam("createdtimeend") String createdTimeEnd,
+ @QueryParam("modifiedtimestart") String modifiedTimeStart,
+ @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+ @QueryParam("relatesto") String relatesTo,
+ @QueryParam("isrelatedto") String isRelatedTo,
+ @QueryParam("infofilters") String infofilters,
+ @QueryParam("conffilters") String conffilters,
+ @QueryParam("metricfilters") String metricfilters,
+ @QueryParam("eventfilters") String eventfilters,
+ @QueryParam("fields") String fields) {
+ return getEntities(req, res, null, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+ flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+ modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+ metricfilters, eventfilters, fields);
+ }
+
+ /**
+ * Return a list of apps for a given cluster id, flow id and flow run id. If
+ * number of matching apps are more than the limit, most recent apps till the
+ * limit is reached, will be returned.
+ */
+ @GET
+ @Path("/flowrunapps/{clusterid}/{flowid}/{flowrunid}/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Set<TimelineEntity> getFlowRunApps(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("clusterid") String clusterId,
+ @PathParam("flowid") String flowId,
+ @PathParam("flowrunid") String flowRunId,
+ @QueryParam("userid") String userId,
+ @QueryParam("limit") String limit,
+ @QueryParam("createdtimestart") String createdTimeStart,
+ @QueryParam("createdtimeend") String createdTimeEnd,
+ @QueryParam("modifiedtimestart") String modifiedTimeStart,
+ @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+ @QueryParam("relatesto") String relatesTo,
+ @QueryParam("isrelatedto") String isRelatedTo,
+ @QueryParam("infofilters") String infofilters,
+ @QueryParam("conffilters") String conffilters,
+ @QueryParam("metricfilters") String metricfilters,
+ @QueryParam("eventfilters") String eventfilters,
+ @QueryParam("fields") String fields) {
+ return getEntities(req, res, clusterId, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+ flowRunId, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+ modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+ metricfilters, eventfilters, fields);
+ }
+
+ /**
+ * Return a list of apps for given flow id. Cluster ID is not provided by
+ * client so default cluster ID has to be taken. If number of matching apps
+ * are more than the limit, most recent apps till the limit is reached, will
+ * be returned.
+ */
+ @GET
+ @Path("/flowapps/{flowid}/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Set<TimelineEntity> getFlowApps(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("flowid") String flowId,
+ @QueryParam("userid") String userId,
+ @QueryParam("limit") String limit,
+ @QueryParam("createdtimestart") String createdTimeStart,
+ @QueryParam("createdtimeend") String createdTimeEnd,
+ @QueryParam("modifiedtimestart") String modifiedTimeStart,
+ @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+ @QueryParam("relatesto") String relatesTo,
+ @QueryParam("isrelatedto") String isRelatedTo,
+ @QueryParam("infofilters") String infofilters,
+ @QueryParam("conffilters") String conffilters,
+ @QueryParam("metricfilters") String metricfilters,
+ @QueryParam("eventfilters") String eventfilters,
+ @QueryParam("fields") String fields) {
+ return getEntities(req, res, null, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+ null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+ modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+ metricfilters, eventfilters, fields);
+ }
+
+ /**
+ * Return a list of apps for a given cluster id and flow id. If number of
+ * matching apps are more than the limit, most recent apps till the limit is
+ * reached, will be returned.
+ */
+ @GET
+ @Path("/flowapps/{clusterid}/{flowid}/")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Set<TimelineEntity> getFlowApps(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @PathParam("clusterid") String clusterId,
+ @PathParam("flowid") String flowId,
+ @QueryParam("userid") String userId,
+ @QueryParam("limit") String limit,
+ @QueryParam("createdtimestart") String createdTimeStart,
+ @QueryParam("createdtimeend") String createdTimeEnd,
+ @QueryParam("modifiedtimestart") String modifiedTimeStart,
+ @QueryParam("modifiedtimeend") String modifiedTimeEnd,
+ @QueryParam("relatesto") String relatesTo,
+ @QueryParam("isrelatedto") String isRelatedTo,
+ @QueryParam("infofilters") String infofilters,
+ @QueryParam("conffilters") String conffilters,
+ @QueryParam("metricfilters") String metricfilters,
+ @QueryParam("eventfilters") String eventfilters,
+ @QueryParam("fields") String fields) {
+ return getEntities(req, res, clusterId, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), userId, flowId,
+ null, limit, createdTimeStart, createdTimeEnd, modifiedTimeStart,
+ modifiedTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters,
+ metricfilters, eventfilters, fields);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index d5b5d63..61954e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
@@ -28,6 +27,8 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import com.google.common.base.Preconditions;
+
/**
* Timeline entity reader for application entities that are stored in the
* application table.
@@ -57,7 +60,7 @@ class ApplicationEntityReader extends GenericEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve);
+ eventFilters, fieldsToRetrieve, true);
}
public ApplicationEntityReader(String userId, String clusterId,
@@ -86,10 +89,63 @@ class ApplicationEntityReader extends GenericEntityReader {
}
@Override
+ protected void validateParams() {
+ Preconditions.checkNotNull(userId, "userId shouldn't be null");
+ Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+ Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+ if (singleEntityRead) {
+ Preconditions.checkNotNull(appId, "appId shouldn't be null");
+ } else {
+ Preconditions.checkNotNull(flowId, "flowId shouldn't be null");
+ }
+ }
+
+ @Override
+ protected void augmentParams(Configuration hbaseConf, Connection conn)
+ throws IOException {
+ if (singleEntityRead) {
+ if (flowId == null || flowRunId == null) {
+ FlowContext context =
+ lookupFlowContext(clusterId, appId, hbaseConf, conn);
+ flowId = context.flowId;
+ flowRunId = context.flowRunId;
+ }
+ }
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.noneOf(Field.class);
+ }
+ if (!singleEntityRead) {
+ if (limit == null || limit < 0) {
+ limit = TimelineReader.DEFAULT_LIMIT;
+ }
+ if (createdTimeBegin == null) {
+ createdTimeBegin = DEFAULT_BEGIN_TIME;
+ }
+ if (createdTimeEnd == null) {
+ createdTimeEnd = DEFAULT_END_TIME;
+ }
+ if (modifiedTimeBegin == null) {
+ modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+ }
+ if (modifiedTimeEnd == null) {
+ modifiedTimeEnd = DEFAULT_END_TIME;
+ }
+ }
+ }
+
+ @Override
protected ResultScanner getResults(Configuration hbaseConf,
Connection conn) throws IOException {
- throw new UnsupportedOperationException(
- "we don't support multiple apps query");
+ Scan scan = new Scan();
+ if (flowRunId != null) {
+ scan.setRowPrefixFilter(ApplicationRowKey.
+ getRowKeyPrefix(clusterId, userId, flowId, flowRunId));
+ } else {
+ scan.setRowPrefixFilter(ApplicationRowKey.
+ getRowKeyPrefix(clusterId, userId, flowId));
+ }
+ scan.setFilter(new PageFilter(limit));
+ return table.getResultScanner(hbaseConf, conn, scan);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
index e68ca17..70a0915 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -20,9 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.Set;
-import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
@@ -60,7 +58,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve);
+ eventFilters, fieldsToRetrieve, true);
}
public FlowActivityEntityReader(String userId, String clusterId,
@@ -78,35 +76,6 @@ class FlowActivityEntityReader extends TimelineEntityReader {
return FLOW_ACTIVITY_TABLE;
}
- /**
- * Since this is strictly sorted by the row key, it is sufficient to collect
- * the first results as specified by the limit.
- */
- @Override
- public Set<TimelineEntity> readEntities(Configuration hbaseConf,
- Connection conn) throws IOException {
- validateParams();
- augmentParams(hbaseConf, conn);
-
- NavigableSet<TimelineEntity> entities = new TreeSet<>();
- ResultScanner results = getResults(hbaseConf, conn);
- try {
- for (Result result : results) {
- TimelineEntity entity = parseEntity(result);
- if (entity == null) {
- continue;
- }
- entities.add(entity);
- if (entities.size() == limit) {
- break;
- }
- }
- return entities;
- } finally {
- results.close();
- }
- }
-
@Override
protected void validateParams() {
Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
index b5d7ae5..90ce28f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -56,7 +56,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve);
+ eventFilters, fieldsToRetrieve, false);
}
public FlowRunEntityReader(String userId, String clusterId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index 396a02b..42079d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -61,8 +61,8 @@ class GenericEntityReader extends TimelineEntityReader {
private static final EntityTable ENTITY_TABLE = new EntityTable();
private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
- private static final long DEFAULT_BEGIN_TIME = 0L;
- private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+ protected static final long DEFAULT_BEGIN_TIME = 0L;
+ protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
/**
* Used to look up the flow context.
@@ -76,11 +76,11 @@ class GenericEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
- EnumSet<Field> fieldsToRetrieve) {
+ EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve);
+ eventFilters, fieldsToRetrieve, sortedKeys);
}
public GenericEntityReader(String userId, String clusterId,
@@ -97,7 +97,7 @@ class GenericEntityReader extends TimelineEntityReader {
return ENTITY_TABLE;
}
- private FlowContext lookupFlowContext(String clusterId, String appId,
+ protected FlowContext lookupFlowContext(String clusterId, String appId,
Configuration hbaseConf, Connection conn) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
Get get = new Get(rowKey);
@@ -113,9 +113,9 @@ class GenericEntityReader extends TimelineEntityReader {
}
}
- private static class FlowContext {
- private final String flowId;
- private final Long flowRunId;
+ protected static class FlowContext {
+ protected final String flowId;
+ protected final Long flowRunId;
public FlowContext(String flowId, Long flowRunId) {
this.flowId = flowId;
this.flowRunId = flowRunId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
index 93be2db..d4a659c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -74,6 +74,14 @@ abstract class TimelineEntityReader {
protected BaseTable<?> table;
/**
+ * Specifies whether keys for this table are sorted in a manner where entities
+ * can be retrieved by created time. If true, it will be sufficient to collect
+ * the first results as specified by the limit. Otherwise all matched entities
+ * will be fetched and then limit applied.
+ */
+ private boolean sortedKeys = false;
+
+ /**
* Instantiates a reader for multiple-entity reads.
*/
protected TimelineEntityReader(String userId, String clusterId,
@@ -83,8 +91,9 @@ abstract class TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
- EnumSet<Field> fieldsToRetrieve) {
+ EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
this.singleEntityRead = false;
+ this.sortedKeys = sortedKeys;
this.userId = userId;
this.clusterId = clusterId;
this.flowId = flowId;
@@ -162,8 +171,14 @@ abstract class TimelineEntityReader {
continue;
}
entities.add(entity);
- if (entities.size() > limit) {
- entities.pollLast();
+ if (!sortedKeys) {
+ if (entities.size() > limit) {
+ entities.pollLast();
+ }
+ } else {
+ if (entities.size() == limit) {
+ break;
+ }
}
}
return entities;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
index 4fdef40..f5341c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -91,7 +91,7 @@ class TimelineEntityReaderFactory {
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve);
+ fieldsToRetrieve, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index e3b5a87..10e3c2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -61,6 +61,40 @@ public class ApplicationRowKey {
}
/**
+ * Constructs a row key prefix for the application table as follows:
+ * {@code clusterId!userName!flowId!}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @return byte array with the row key prefix
+ */
+ public static byte[] getRowKeyPrefix(String clusterId, String userId,
+ String flowId) {
+ byte[] first = Bytes.toBytes(
+ Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
+ return Separator.QUALIFIERS.join(first, new byte[0]);
+ }
+
+ /**
+ * Constructs a row key prefix for the application table as follows:
+ * {@code clusterId!userName!flowId!flowRunId!}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @return byte array with the row key prefix
+ */
+ public static byte[] getRowKeyPrefix(String clusterId, String userId,
+ String flowId, Long flowRunId) {
+ byte[] first = Bytes.toBytes(
+ Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ return Separator.QUALIFIERS.join(first, second, new byte[0]);
+ }
+
+ /**
* Constructs a row key for the application table as follows:
* {@code clusterId!userName!flowId!flowRunId!AppId}
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
deleted file mode 100644
index e359f78..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesFlowRun.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.reader;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.GenericType;
-import com.sun.jersey.api.client.ClientResponse.Status;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-
-public class TestTimelineReaderWebServicesFlowRun {
- private int serverPort;
- private TimelineReaderServer server;
- private static HBaseTestingUtility util;
- private static long ts = System.currentTimeMillis();
-
- @BeforeClass
- public static void setup() throws Exception {
- util = new HBaseTestingUtility();
- Configuration conf = util.getConfiguration();
- conf.setInt("hfile.format.version", 3);
- util.startMiniCluster();
- TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
- loadData();
- }
-
- private static void loadData() throws Exception {
- String cluster = "cluster1";
- String user = "user1";
- String flow = "flow_name";
- String flowVersion = "CF7022C10F1354";
- Long runid = 1002345678919L;
- Long runid1 = 1002345678920L;
-
- TimelineEntities te = new TimelineEntities();
- TimelineEntity entity = new TimelineEntity();
- String id = "flowRunMetrics_test";
- String type = TimelineEntityType.YARN_APPLICATION.toString();
- entity.setId(id);
- entity.setType(type);
- Long cTime = 1425016501000L;
- entity.setCreatedTime(cTime);
-
- // add metrics
- Set<TimelineMetric> metrics = new HashSet<>();
- TimelineMetric m1 = new TimelineMetric();
- m1.setId("MAP_SLOT_MILLIS");
- Map<Long, Number> metricValues = new HashMap<Long, Number>();
- metricValues.put(ts - 100000, 2);
- metricValues.put(ts - 80000, 40);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
-
- m1 = new TimelineMetric();
- m1.setId("HDFS_BYTES_READ");
- metricValues = new HashMap<Long, Number>();
- metricValues.put(ts - 100000, 31);
- metricValues.put(ts - 80000, 57);
- m1.setType(Type.TIME_SERIES);
- m1.setValues(metricValues);
- metrics.add(m1);
- entity.addMetrics(metrics);
-
- TimelineEvent event = new TimelineEvent();
- event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- Long expTs = 1436512802000L;
- event.setTimestamp(expTs);
- String expKey = "foo_event";
- Object expVal = "test";
- event.addInfo(expKey, expVal);
- entity.addEvent(event);
- te.addEntity(entity);
-
- // write another application with same metric to this flow
- TimelineEntities te1 = new TimelineEntities();
- TimelineEntity entity1 = new TimelineEntity();
- id = "flowRunMetrics_test";
- type = TimelineEntityType.YARN_APPLICATION.toString();
- entity1.setId(id);
- entity1.setType(type);
- cTime = 1425016501000L;
- entity1.setCreatedTime(cTime);
- // add metrics
- metrics.clear();
- TimelineMetric m2 = new TimelineMetric();
- m2.setId("MAP_SLOT_MILLIS");
- metricValues = new HashMap<Long, Number>();
- metricValues.put(ts - 100000, 5L);
- metricValues.put(ts - 80000, 101L);
- m2.setType(Type.TIME_SERIES);
- m2.setValues(metricValues);
- metrics.add(m2);
- entity1.addMetrics(metrics);
- te1.addEntity(entity1);
-
- String flow2 = "flow_name2";
- String flowVersion2 = "CF7022C10F1454";
- Long runid2 = 2102356789046L;
- TimelineEntities te3 = new TimelineEntities();
- TimelineEntity entity3 = new TimelineEntity();
- id = "flowRunMetrics_test1";
- entity3.setId(id);
- entity3.setType(type);
- cTime = 1425016501030L;
- entity3.setCreatedTime(cTime);
- TimelineEvent event2 = new TimelineEvent();
- event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- event2.setTimestamp(1436512802030L);
- event2.addInfo("foo_event", "test");
- entity3.addEvent(event2);
- te3.addEntity(entity3);
-
- HBaseTimelineWriterImpl hbi = null;
- Configuration c1 = util.getConfiguration();
- try {
- hbi = new HBaseTimelineWriterImpl(c1);
- hbi.init(c1);
- String appName = "application_11111111111111_1111";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
- appName = "application_11111111111111_2222";
- hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
- hbi.write(cluster, user, flow, flowVersion, runid1, appName, te);
- appName = "application_11111111111111_2223";
- hbi.write(cluster, user, flow2, flowVersion2, runid2, appName, te3);
- hbi.flush();
- } finally {
- hbi.close();
- }
- }
-
- @AfterClass
- public static void tearDown() throws Exception {
- util.shutdownMiniCluster();
- }
-
- @Before
- public void init() throws Exception {
- try {
- Configuration config = util.getConfiguration();
- config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- "localhost:0");
- config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
- config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
- "org.apache.hadoop.yarn.server.timelineservice.storage." +
- "HBaseTimelineReaderImpl");
- config.setInt("hfile.format.version", 3);
- server = new TimelineReaderServer();
- server.init(config);
- server.start();
- serverPort = server.getWebServerPort();
- } catch (Exception e) {
- Assert.fail("Web server failed to start");
- }
- }
-
- private static Client createClient() {
- ClientConfig cfg = new DefaultClientConfig();
- cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
- return new Client(new URLConnectionClientHandler(
- new DummyURLConnectionFactory()), cfg);
- }
-
- private static ClientResponse getResponse(Client client, URI uri)
- throws Exception {
- ClientResponse resp =
- client.resource(uri).accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- if (resp == null ||
- resp.getClientResponseStatus() != ClientResponse.Status.OK) {
- String msg = new String();
- if (resp != null) {
- msg = resp.getClientResponseStatus().toString();
- }
- throw new IOException("Incorrect response from timeline reader. " +
- "Status=" + msg);
- }
- return resp;
- }
-
- private static class DummyURLConnectionFactory
- implements HttpURLConnectionFactory {
-
- @Override
- public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
- try {
- return (HttpURLConnection)url.openConnection();
- } catch (UndeclaredThrowableException e) {
- throw new IOException(e.getCause());
- }
- }
- }
-
- private static TimelineMetric newMetric(String id, long ts, Number value) {
- TimelineMetric metric = new TimelineMetric();
- metric.setId(id);
- metric.addValue(ts, value);
- return metric;
- }
-
- private static boolean verifyMetricValues(Map<Long, Number> m1,
- Map<Long, Number> m2) {
- for (Map.Entry<Long, Number> entry : m1.entrySet()) {
- if (!m2.containsKey(entry.getKey())) {
- return false;
- }
- if (m2.get(entry.getKey()).equals(entry.getValue())) {
- return false;
- }
- }
- return true;
- }
-
- private static boolean verifyMetrics(
- TimelineMetric m, TimelineMetric... metrics) {
- for (TimelineMetric metric : metrics) {
- if (!metric.equals(m)) {
- continue;
- }
- if (!verifyMetricValues(metric.getValues(), m.getValues())) {
- continue;
- }
- return true;
- }
- return false;
- }
-
- private static void verifyHttpResponse(Client client, URI uri,
- Status status) {
- ClientResponse resp =
- client.resource(uri).accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertNotNull(resp);
- assertTrue("Response from server should have been " + status,
- resp.getClientResponseStatus().equals(status));
- }
-
- @Test
- public void testGetFlowRun() throws Exception {
- Client client = createClient();
- try {
- URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
- ClientResponse resp = getResponse(client, uri);
- FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
- assertNotNull(entity);
- assertEquals("user1@flow_name/1002345678919", entity.getId());
- assertEquals(2, entity.getMetrics().size());
- TimelineMetric m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
- TimelineMetric m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
- for (TimelineMetric metric : entity.getMetrics()) {
- assertTrue(verifyMetrics(metric, m1, m2));
- }
-
- // Query without specifying cluster ID.
- uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrun/flow_name/1002345678919?userid=user1");
- resp = getResponse(client, uri);
- entity = resp.getEntity(FlowRunEntity.class);
- assertNotNull(entity);
- assertEquals("user1@flow_name/1002345678919", entity.getId());
- assertEquals(2, entity.getMetrics().size());
- m1 = newMetric("HDFS_BYTES_READ", ts - 80000, 57L);
- m2 = newMetric("MAP_SLOT_MILLIS", ts - 80000, 141L);
- for (TimelineMetric metric : entity.getMetrics()) {
- assertTrue(verifyMetrics(metric, m1, m2));
- }
- } finally {
- client.destroy();
- }
- }
-
- @Test
- public void testGetFlows() throws Exception {
- Client client = createClient();
- try {
- URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flows/cluster1");
- ClientResponse resp = getResponse(client, uri);
- Set<FlowActivityEntity> entities =
- resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
- for (FlowActivityEntity entity : entities) {
- assertTrue((entity.getId().endsWith("@flow_name") &&
- entity.getFlowRuns().size() == 2) ||
- (entity.getId().endsWith("@flow_name2") &&
- entity.getFlowRuns().size() == 1));
- }
-
- // Query without specifying cluster ID.
- uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flows/");
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
-
- uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flows/cluster1?limit=1");
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(1, entities.size());
- } finally {
- client.destroy();
- }
- }
-
- @Test
- public void testGetFlowRunNotPresent() throws Exception {
- Client client = createClient();
- try {
- URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
- verifyHttpResponse(client, uri, Status.NOT_FOUND);
- } finally {
- client.destroy();
- }
- }
-
- @Test
- public void testGetFlowsNotPresent() throws Exception {
- Client client = createClient();
- try {
- URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
- "timeline/flows/cluster2");
- ClientResponse resp = getResponse(client, uri);
- Set<FlowActivityEntity> entities =
- resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
- assertNotNull(entities);
- assertEquals(0, entities.size());
- } finally {
- client.destroy();
- }
- }
-
- @After
- public void stop() throws Exception {
- if (server != null) {
- server.stop();
- server = null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2b6784ab/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
new file mode 100644
index 0000000..a89d2fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+public class TestTimelineReaderWebServicesHBaseStorage {
+ private int serverPort;
+ private TimelineReaderServer server;
+ private static HBaseTestingUtility util;
+ private static long ts = System.currentTimeMillis();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ loadData();
+ }
+
+ private static void loadData() throws Exception {
+ String cluster = "cluster1";
+ String user = "user1";
+ String flow = "flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+ Long runid1 = 1002345678920L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ String id = "application_1111111111_1111";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ entity.addConfig("cfg2", "value1");
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId("MAP_SLOT_MILLIS");
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ metricValues.put(ts - 100000, 2);
+ metricValues.put(ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ m1 = new TimelineMetric();
+ m1.setId("HDFS_BYTES_READ");
+ metricValues = new HashMap<Long, Number>();
+ metricValues.put(ts - 100000, 31);
+ metricValues.put(ts - 80000, 57);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ TimelineEvent event11 = new TimelineEvent();
+ event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ expTs = 1436512802010L;
+ event11.setTimestamp(expTs);
+ entity.addEvent(event11);
+
+ te.addEntity(entity);
+
+ // write another application with same metric to this flow
+ TimelineEntities te1 = new TimelineEntities();
+ TimelineEntity entity1 = new TimelineEntity();
+ id = "application_1111111111_2222";
+ type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity1.setId(id);
+ entity1.setType(type);
+ cTime = 1425016501000L;
+ entity1.setCreatedTime(cTime);
+ entity1.addConfig("cfg1", "value1");
+ // add metrics
+ metrics.clear();
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId("MAP_SLOT_MILLIS");
+ metricValues = new HashMap<Long, Number>();
+ metricValues.put(ts - 100000, 5L);
+ metricValues.put(ts - 80000, 101L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+ entity1.addMetrics(metrics);
+ TimelineEvent event1 = new TimelineEvent();
+ event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event1.setTimestamp(expTs);
+ event1.addInfo(expKey, expVal);
+ entity1.addEvent(event1);
+ te1.addEntity(entity1);
+
+ String flow2 = "flow_name2";
+ String flowVersion2 = "CF7022C10F1454";
+ Long runid2 = 2102356789046L;
+ TimelineEntities te3 = new TimelineEntities();
+ TimelineEntity entity3 = new TimelineEntity();
+ id = "application_11111111111111_2223";
+ entity3.setId(id);
+ entity3.setType(type);
+ cTime = 1425016501030L;
+ entity3.setCreatedTime(cTime);
+ TimelineEvent event2 = new TimelineEvent();
+ event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event2.setTimestamp(1436512802030L);
+ event2.addInfo("foo_event", "test");
+ entity3.addEvent(event2);
+ te3.addEntity(entity3);
+
+ TimelineEntities te4 = new TimelineEntities();
+ TimelineEntity entity4 = new TimelineEntity();
+ id = "application_1111111111_2224";
+ entity4.setId(id);
+ entity4.setType(type);
+ cTime = 1425016501034L;
+ entity4.setCreatedTime(cTime);
+ TimelineEvent event4 = new TimelineEvent();
+ event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event4.setTimestamp(1436512802037L);
+ event4.addInfo("foo_event", "test");
+ entity4.addEvent(event4);
+ te4.addEntity(entity4);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te);
+ hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1);
+ hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
+ hbi.write(cluster, user, flow2,
+ flowVersion2, runid2, entity3.getId(), te3);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Before
+ public void init() throws Exception {
+ try {
+ Configuration config = util.getConfiguration();
+ config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ "localhost:0");
+ config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+ config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+ "org.apache.hadoop.yarn.server.timelineservice.storage." +
+ "HBaseTimelineReaderImpl");
+ config.setInt("hfile.format.version", 3);
+ server = new TimelineReaderServer();
+ server.init(config);
+ server.start();
+ serverPort = server.getWebServerPort();
+ } catch (Exception e) {
+ Assert.fail("Web server failed to start");
+ }
+ }
+
+ private static Client createClient() {
+ ClientConfig cfg = new DefaultClientConfig();
+ cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+ return new Client(new URLConnectionClientHandler(
+ new DummyURLConnectionFactory()), cfg);
+ }
+
+ private static ClientResponse getResponse(Client client, URI uri)
+ throws Exception {
+ ClientResponse resp =
+ client.resource(uri).accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg = new String();
+ if (resp != null) {
+ msg = resp.getClientResponseStatus().toString();
+ }
+ throw new IOException("Incorrect response from timeline reader. " +
+ "Status=" + msg);
+ }
+ return resp;
+ }
+
+ private static class DummyURLConnectionFactory
+ implements HttpURLConnectionFactory {
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+ try {
+ return (HttpURLConnection)url.openConnection();
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ }
+ }
+ }
+
+ private static TimelineEntity newEntity(String type, String id) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setIdentifier(new TimelineEntity.Identifier(type, id));
+ return entity;
+ }
+
+ private static TimelineMetric newMetric(TimelineMetric.Type type,
+ String id, long ts, Number value) {
+ TimelineMetric metric = new TimelineMetric(type);
+ metric.setId(id);
+ metric.addValue(ts, value);
+ return metric;
+ }
+
+ private static boolean verifyMetricValues(Map<Long, Number> m1,
+ Map<Long, Number> m2) {
+ for (Map.Entry<Long, Number> entry : m1.entrySet()) {
+ if (!m2.containsKey(entry.getKey())) {
+ return false;
+ }
+ if (m2.get(entry.getKey()).equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean verifyMetrics(
+ TimelineMetric m, TimelineMetric... metrics) {
+ for (TimelineMetric metric : metrics) {
+ if (!metric.equals(m)) {
+ continue;
+ }
+ if (!verifyMetricValues(metric.getValues(), m.getValues())) {
+ continue;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private static void verifyHttpResponse(Client client, URI uri,
+ Status status) {
+ ClientResponse resp =
+ client.resource(uri).accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertNotNull(resp);
+ assertTrue("Response from server should have been " + status,
+ resp.getClientResponseStatus().equals(status));
+ }
+
+ @Test
+ public void testGetFlowRun() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrun/cluster1/flow_name/1002345678919?userid=user1");
+ ClientResponse resp = getResponse(client, uri);
+ FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entity);
+ assertEquals("user1@flow_name/1002345678919", entity.getId());
+ assertEquals(2, entity.getMetrics().size());
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "HDFS_BYTES_READ", ts - 80000, 57L);
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "MAP_SLOT_MILLIS", ts - 80000, 141L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrun/flow_name/1002345678919?userid=user1");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(FlowRunEntity.class);
+ assertNotNull(entity);
+ assertEquals("user1@flow_name/1002345678919", entity.getId());
+ assertEquals(2, entity.getMetrics().size());
+ m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "HDFS_BYTES_READ", ts - 80000, 57L);
+ m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "MAP_SLOT_MILLIS", ts - 80000, 141L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlows() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flows/cluster1");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowActivityEntity> entities =
+ resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (FlowActivityEntity entity : entities) {
+ assertTrue((entity.getId().endsWith("@flow_name") &&
+ entity.getFlowRuns().size() == 2) ||
+ (entity.getId().endsWith("@flow_name2") &&
+ entity.getFlowRuns().size() == 1));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flows/");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flows/cluster1?limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetApp() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app/cluster1/application_1111111111_1111?" +
+ "userid=user1&fields=ALL&flowid=flow_name&flowrunid=1002345678919");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("application_1111111111_1111", entity.getId());
+ assertEquals(2, entity.getMetrics().size());
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "HDFS_BYTES_READ", ts - 100000, 31L);
+ m1.addValue(ts - 80000, 57L);
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP_SLOT_MILLIS", ts - 100000, 2L);
+ m2.addValue(ts - 80000, 40L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app/application_1111111111_2222?userid=user1" +
+ "&fields=metrics&flowid=flow_name&flowrunid=1002345678919");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("application_1111111111_2222", entity.getId());
+ assertEquals(1, entity.getMetrics().size());
+ TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP_SLOT_MILLIS", ts - 100000, 5L);
+ m2.addValue(ts - 80000, 101L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m3));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetAppWithoutFlowInfo() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app/cluster1/application_1111111111_1111?" +
+ "userid=user1&fields=ALL");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("application_1111111111_1111", entity.getId());
+ assertEquals(2, entity.getMetrics().size());
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "HDFS_BYTES_READ", ts - 100000, 31L);
+ m1.addValue(ts - 80000, 57L);
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP_SLOT_MILLIS", ts - 100000, 2L);
+ m2.addValue(ts - 80000, 40L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRunApps() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrunapps/cluster1/flow_name/1002345678919?" +
+ "userid=user1&fields=ALL");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue("Unexpected app in result",
+ (entity.getId().equals("application_1111111111_1111") &&
+ entity.getMetrics().size() == 2) ||
+ (entity.getId().equals("application_1111111111_2222") &&
+ entity.getMetrics().size() == 1));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrunapps/flow_name/1002345678919?userid=user1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrunapps/flow_name/1002345678919?userid=user1&limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowApps() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/cluster1/flow_name?userid=user1&fields=ALL");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(3, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue("Unexpected app in result",
+ (entity.getId().equals("application_1111111111_1111") &&
+ entity.getMetrics().size() == 2) ||
+ (entity.getId().equals("application_1111111111_2222") &&
+ entity.getMetrics().size() == 1) ||
+ (entity.getId().equals("application_1111111111_2224") &&
+ entity.getMetrics().size() == 0));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/flow_name?userid=user1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(3, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/flow_name?userid=user1&limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowAppsFilters() throws Exception {
+ Client client = createClient();
+ try {
+ String entityType = TimelineEntityType.YARN_APPLICATION.toString();
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/cluster1/flow_name?userid=user1&eventfilters=" +
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertTrue("Unexpected app in result", entities.contains(
+ newEntity(entityType, "application_1111111111_1111")));
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/cluster1/flow_name?userid=user1&metricfilters=" +
+ "HDFS_BYTES_READ");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertTrue("Unexpected app in result", entities.contains(
+ newEntity(entityType, "application_1111111111_1111")));
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/cluster1/flow_name?userid=user1&conffilters=" +
+ "cfg1:value1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertTrue("Unexpected app in result", entities.contains(
+ newEntity(entityType, "application_1111111111_2222")));
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRunNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
+ verifyHttpResponse(client, uri, Status.NOT_FOUND);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowsNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flows/cluster2");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowActivityEntity> entities =
+ resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetAppNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app/cluster1/flow_name/1002345678919/" +
+ "application_1111111111_1378?userid=user1");
+ verifyHttpResponse(client, uri, Status.NOT_FOUND);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRunAppsNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowrunapps/cluster2/flow_name/1002345678919");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowAppsNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flowapps/cluster2/flow_name55");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @After
+ public void stop() throws Exception {
+ if (server != null) {
+ server.stop();
+ server = null;
+ }
+ }
+}