You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2017/08/22 13:34:47 UTC
[12/51] [abbrv] hadoop git commit: YARN-6064. Support fromId for
flowRuns and flow/flowRun apps REST API's (Rohith Sharma K S via Varun
Saxena)
YARN-6064. Support fromId for flowRuns and flow/flowRun apps REST API's (Rohith Sharma K S via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/800fb4e9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/800fb4e9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/800fb4e9
Branch: refs/heads/YARN-5355
Commit: 800fb4e95664ce3005505eff860e4abd649e620b
Parents: 889aa68
Author: Varun Saxena <va...@apache.org>
Authored: Wed Jan 18 10:30:15 2017 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Tue Aug 22 19:03:02 2017 +0530
----------------------------------------------------------------------
...stTimelineReaderWebServicesHBaseStorage.java | 256 +++++++++++++++----
.../storage/reader/ApplicationEntityReader.java | 43 +++-
.../storage/reader/FlowRunEntityReader.java | 29 ++-
.../reader/TimelineReaderWebServices.java | 72 ++++--
4 files changed, 327 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/800fb4e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 7d9d46a..69959cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -40,6 +40,8 @@ import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.junit.After;
import org.junit.AfterClass;
@@ -352,6 +355,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
flowVersion2, runid2, entity3.getId(), te3);
hbi.write(cluster, user, flow, flowVersion, runid,
"application_1111111111_1111", userEntities);
+ writeApplicationEntities(hbi);
hbi.flush();
} finally {
if (hbi != null) {
@@ -360,6 +364,35 @@ public class TestTimelineReaderWebServicesHBaseStorage {
}
}
+ static void writeApplicationEntities(HBaseTimelineWriterImpl hbi)
+ throws IOException {
+ long currentTimeMillis = System.currentTimeMillis();
+ int count = 1;
+ for (long i = 1; i <= 3; i++) {
+ for (int j = 1; j <= 5; j++) {
+ TimelineEntities te = new TimelineEntities();
+ ApplicationId appId =
+ BuilderUtils.newApplicationId(currentTimeMillis, count++);
+ ApplicationEntity appEntity = new ApplicationEntity();
+ appEntity.setId(appId.toString());
+ appEntity.setCreatedTime(currentTimeMillis);
+
+ TimelineEvent created = new TimelineEvent();
+ created.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ created.setTimestamp(currentTimeMillis);
+ appEntity.addEvent(created);
+ TimelineEvent finished = new TimelineEvent();
+ finished.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ finished.setTimestamp(currentTimeMillis + i * j);
+
+ appEntity.addEvent(finished);
+ te.addEntity(appEntity);
+ hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i,
+ appEntity.getId(), te);
+ }
+ }
+ }
+
@AfterClass
public static void tearDown() throws Exception {
util.shutdownMiniCluster();
@@ -719,7 +752,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
Set<FlowActivityEntity> flowEntities =
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
assertNotNull(flowEntities);
- assertEquals(2, flowEntities.size());
+ assertEquals(3, flowEntities.size());
List<String> listFlowUIDs = new ArrayList<String>();
for (FlowActivityEntity entity : flowEntities) {
String flowUID =
@@ -731,7 +764,9 @@ public class TestTimelineReaderWebServicesHBaseStorage {
assertTrue((entity.getId().endsWith("@flow_name") &&
entity.getFlowRuns().size() == 2) ||
(entity.getId().endsWith("@flow_name2") &&
- entity.getFlowRuns().size() == 1));
+ entity.getFlowRuns().size() == 1)
+ || (entity.getId().endsWith("@flow1")
+ && entity.getFlowRuns().size() == 3));
}
// Query flowruns based on UID returned in query above.
@@ -753,7 +788,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
flowRunUID);
}
}
- assertEquals(3, listFlowRunUIDs.size());
+ assertEquals(6, listFlowRunUIDs.size());
// Query single flowrun based on UIDs' returned in query to get flowruns.
for (String flowRunUID : listFlowRunUIDs) {
@@ -785,7 +820,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
context.getFlowRunId(), entity.getId(), null, null)), appUID);
}
}
- assertEquals(4, listAppUIDs.size());
+ assertEquals(19, listAppUIDs.size());
// Query single app based on UIDs' returned in query to get apps.
for (String appUID : listAppUIDs) {
@@ -966,32 +1001,20 @@ public class TestTimelineReaderWebServicesHBaseStorage {
try {
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows");
- ClientResponse resp = getResponse(client, uri);
- Set<FlowActivityEntity> entities =
- resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
- for (FlowActivityEntity entity : entities) {
- assertTrue((entity.getId().endsWith("@flow_name") &&
- entity.getFlowRuns().size() == 2) ||
- (entity.getId().endsWith("@flow_name2") &&
- entity.getFlowRuns().size() == 1));
- }
+
+ verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+ new String[] {"flow1", "flow_name", "flow_name2"});
// Query without specifying cluster ID.
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/flows/");
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
+ verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+ new String[] {"flow1", "flow_name", "flow_name2"});
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?limit=1");
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(1, entities.size());
+ verifyFlowEntites(client, uri, 1, new int[] {3},
+ new String[] {"flow1"});
long firstFlowActivity =
HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
@@ -1001,40 +1024,25 @@ public class TestTimelineReaderWebServicesHBaseStorage {
"timeline/clusters/cluster1/flows?daterange="
+ fmt.format(firstFlowActivity) + "-"
+ fmt.format(dayTs));
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
- for (FlowActivityEntity entity : entities) {
- assertTrue((entity.getId().endsWith("@flow_name") &&
- entity.getFlowRuns().size() == 2) ||
- (entity.getId().endsWith("@flow_name2") &&
- entity.getFlowRuns().size() == 1));
- }
+ verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+ new String[] {"flow1", "flow_name", "flow_name2"});
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=" +
fmt.format(dayTs + (4*86400000L)));
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(0, entities.size());
+ verifyFlowEntites(client, uri, 0, new int[] {}, new String[] {});
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=-" +
fmt.format(dayTs));
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
+ verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+ new String[] {"flow1", "flow_name", "flow_name2"});
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=" +
fmt.format(firstFlowActivity) + "-");
- resp = getResponse(client, uri);
- entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
- assertNotNull(entities);
- assertEquals(2, entities.size());
+ verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1},
+ new String[] {"flow1", "flow_name", "flow_name2"});
uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
"timeline/clusters/cluster1/flows?daterange=20150711:20150714");
@@ -2267,4 +2275,162 @@ public class TestTimelineReaderWebServicesHBaseStorage {
}
return entity;
}
+
+ private void verifyFlowEntites(Client client, URI uri, int noOfEntities,
+ int[] a, String[] flowsInSequence) throws Exception {
+ ClientResponse resp = getResponse(client, uri);
+ List<FlowActivityEntity> entities =
+ resp.getEntity(new GenericType<List<FlowActivityEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(noOfEntities, entities.size());
+ assertEquals(noOfEntities, flowsInSequence.length);
+ assertEquals(noOfEntities, a.length);
+ int count = 0;
+ for (FlowActivityEntity timelineEntity : entities) {
+ assertEquals(flowsInSequence[count],
+ timelineEntity.getInfo().get("SYSTEM_INFO_FLOW_NAME"));
+ assertEquals(a[count++], timelineEntity.getFlowRuns().size());
+ }
+ }
+
+ @Test
+ public void testForFlowAppsPagination() throws Exception {
+ Client client = createClient();
+ try {
+ // app entities stored is 15 during initialization.
+ int totalAppEntities = 15;
+ String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+ + "timeline/clusters/cluster1/users/user1/flows/flow1/apps";
+ URI uri = URI.create(resourceUri);
+ ClientResponse resp = getResponse(client, uri);
+ List<TimelineEntity> entities =
+ resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(totalAppEntities, entities.size());
+ TimelineEntity entity1 = entities.get(0);
+ TimelineEntity entity15 = entities.get(totalAppEntities - 1);
+
+ int limit = 10;
+ String queryParam = "?limit=" + limit;
+ uri = URI.create(resourceUri + queryParam);
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(limit, entities.size());
+ assertEquals(entity1, entities.get(0));
+ TimelineEntity entity10 = entities.get(limit - 1);
+
+ uri =
+ URI.create(resourceUri + queryParam + "&fromid=" + entity10.getId());
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(6, entities.size());
+ assertEquals(entity10, entities.get(0));
+ assertEquals(entity15, entities.get(5));
+
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testForFlowRunAppsPagination() throws Exception {
+ Client client = createClient();
+ try {
+ // app entities stored is 15 during initialization.
+ int totalAppEntities = 5;
+ String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+ + "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps";
+ URI uri = URI.create(resourceUri);
+ ClientResponse resp = getResponse(client, uri);
+ List<TimelineEntity> entities =
+ resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(totalAppEntities, entities.size());
+ TimelineEntity entity1 = entities.get(0);
+ TimelineEntity entity5 = entities.get(totalAppEntities - 1);
+
+ int limit = 3;
+ String queryParam = "?limit=" + limit;
+ uri = URI.create(resourceUri + queryParam);
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(limit, entities.size());
+ assertEquals(entity1, entities.get(0));
+ TimelineEntity entity3 = entities.get(limit - 1);
+
+ uri =
+ URI.create(resourceUri + queryParam + "&fromid=" + entity3.getId());
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(3, entities.size());
+ assertEquals(entity3, entities.get(0));
+ assertEquals(entity5, entities.get(2));
+
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testForFlowRunsPagination() throws Exception {
+ Client client = createClient();
+ try {
+ // app entities stored is 15 during initialization.
+ int totalRuns = 3;
+ String resourceUri = "http://localhost:" + serverPort + "/ws/v2/"
+ + "timeline/clusters/cluster1/users/user1/flows/flow1/runs";
+ URI uri = URI.create(resourceUri);
+ ClientResponse resp = getResponse(client, uri);
+ List<TimelineEntity> entities =
+ resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(totalRuns, entities.size());
+ TimelineEntity entity1 = entities.get(0);
+ TimelineEntity entity3 = entities.get(totalRuns - 1);
+
+ int limit = 2;
+ String queryParam = "?limit=" + limit;
+ uri = URI.create(resourceUri + queryParam);
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(limit, entities.size());
+ assertEquals(entity1, entities.get(0));
+ TimelineEntity entity2 = entities.get(limit - 1);
+
+ uri = URI.create(resourceUri + queryParam + "&fromid="
+ + entity2.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(limit, entities.size());
+ assertEquals(entity2, entities.get(0));
+ assertEquals(entity3, entities.get(1));
+
+ uri = URI.create(resourceUri + queryParam + "&fromid="
+ + entity3.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID"));
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<List<TimelineEntity>>() {
+ });
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertEquals(entity3, entities.get(0));
+ } finally {
+ client.destroy();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/800fb4e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
index 1667f61..8a331c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
@@ -359,13 +361,44 @@ class ApplicationEntityReader extends GenericEntityReader {
Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
TimelineReaderContext context = getContext();
+ RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix = null;
+
// Whether or not flowRunID is null doesn't matter, the
// ApplicationRowKeyPrefix will do the right thing.
- RowKeyPrefix<ApplicationRowKey> applicationRowKeyPrefix =
- new ApplicationRowKeyPrefix(context.getClusterId(),
- context.getUserId(), context.getFlowName(),
- context.getFlowRunId());
- scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
+ // default mode, will always scans from beginning of entity type.
+ if (getFilters().getFromId() == null) {
+ applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
+ context.getClusterId(), context.getUserId(), context.getFlowName(),
+ context.getFlowRunId());
+ scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix());
+ } else {
+ Long flowRunId = context.getFlowRunId();
+ if (flowRunId == null) {
+ AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(
+ context.getClusterId(), getFilters().getFromId());
+ FlowContext flowContext =
+ lookupFlowContext(appToFlowRowKey, hbaseConf, conn);
+ flowRunId = flowContext.getFlowRunId();
+ }
+
+ ApplicationRowKey applicationRowKey =
+ new ApplicationRowKey(context.getClusterId(), context.getUserId(),
+ context.getFlowName(), flowRunId, getFilters().getFromId());
+
+ // set start row
+ scan.setStartRow(applicationRowKey.getRowKey());
+
+ // get the bytes for stop row
+ applicationRowKeyPrefix = new ApplicationRowKeyPrefix(
+ context.getClusterId(), context.getUserId(), context.getFlowName(),
+ context.getFlowRunId());
+
+ // set stop row
+ scan.setStopRow(
+ HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+ applicationRowKeyPrefix.getRowKeyPrefix()));
+ }
+
FilterList newList = new FilterList();
newList.addFilter(new PageFilter(getFilters().getLimit()));
if (filterList != null && !filterList.getFilters().isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/800fb4e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
index 9b8482c..cedf96a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
@@ -210,10 +211,30 @@ class FlowRunEntityReader extends TimelineEntityReader {
FilterList filterList) throws IOException {
Scan scan = new Scan();
TimelineReaderContext context = getContext();
- RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix =
- new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(),
- context.getFlowName());
- scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
+ RowKeyPrefix<FlowRunRowKey> flowRunRowKeyPrefix = null;
+ if (getFilters().getFromId() == null) {
+ flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(),
+ context.getUserId(), context.getFlowName());
+ scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix());
+ } else {
+
+ FlowRunRowKey flowRunRowKey =
+ new FlowRunRowKey(context.getClusterId(), context.getUserId(),
+ context.getFlowName(), Long.parseLong(getFilters().getFromId()));
+
+ // set start row
+ scan.setStartRow(flowRunRowKey.getRowKey());
+
+ // get the bytes for stop row
+ flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(),
+ context.getUserId(), context.getFlowName());
+
+ // set stop row
+ scan.setStopRow(
+ HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix(
+ flowRunRowKeyPrefix.getRowKeyPrefix()));
+ }
+
FilterList newList = new FilterList();
newList.addFilter(new PageFilter(getFilters().getLimit()));
if (filterList != null && !filterList.getFilters().isEmpty()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/800fb4e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
index 9f98ff9..7133528 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java
@@ -1098,6 +1098,9 @@ public class TimelineReaderWebServices {
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
+ * @param fromId Defines the flow run id. If specified, retrieve the next
+ * set of flow runs from the given id. The set of flow runs retrieved
+ * is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1119,7 +1122,8 @@ public class TimelineReaderWebServices {
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
- @QueryParam("fields") String fields) {
+ @QueryParam("fields") String fields,
+ @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@@ -1141,11 +1145,12 @@ public class TimelineReaderWebServices {
entities = timelineReaderManager.getEntities(context,
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
- null, null, null, null, null),
+ null, null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
- handleException(e, url, startTime, "createdTime start/end or limit");
+ handleException(e, url, startTime,
+ "createdTime start/end or limit or fromId");
}
long endTime = Time.monotonicNow();
if (entities == null) {
@@ -1183,6 +1188,9 @@ public class TimelineReaderWebServices {
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
+ * @param fromId Defines the flow run id. If specified, retrieve the next
+ * set of flow runs from the given id. The set of flow runs retrieved
+ * is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1205,9 +1213,10 @@ public class TimelineReaderWebServices {
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
- @QueryParam("fields") String fields) {
+ @QueryParam("fields") String fields,
+ @QueryParam("fromid") String fromId) {
return getFlowRuns(req, res, null, userId, flowName, limit,
- createdTimeStart, createdTimeEnd, metricsToRetrieve, fields);
+ createdTimeStart, createdTimeEnd, metricsToRetrieve, fields, fromId);
}
/**
@@ -1238,6 +1247,9 @@ public class TimelineReaderWebServices {
* METRICS makes sense for flow runs hence only ALL or METRICS are
* supported as fields for fetching flow runs. Other fields will lead to
* HTTP 400 (Bad Request) response. (Optional query param).
+ * @param fromId Defines the flow run id. If specified, retrieve the next
+ * set of flow runs from the given id. The set of flow runs retrieved
+ * is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing a
* set of <cite>FlowRunEntity</cite> instances for the given flow are
@@ -1261,7 +1273,8 @@ public class TimelineReaderWebServices {
@QueryParam("createdtimestart") String createdTimeStart,
@QueryParam("createdtimeend") String createdTimeEnd,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
- @QueryParam("fields") String fields) {
+ @QueryParam("fields") String fields,
+ @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@@ -1280,11 +1293,12 @@ public class TimelineReaderWebServices {
TimelineEntityType.YARN_FLOW_RUN.toString(), null, null),
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, null, null, null,
- null, null, null, null, null),
+ null, null, null, null, fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
null, metricsToRetrieve, fields, null));
} catch (Exception e) {
- handleException(e, url, startTime, "createdTime start/end or limit");
+ handleException(e, url, startTime,
+ "createdTime start/end or limit or fromId");
}
long endTime = Time.monotonicNow();
if (entities == null) {
@@ -1720,6 +1734,9 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
+ * @param fromId Defines the application id. If specified, retrieve the next
+ * set of applications from the given id. The set of applications
+ * retrieved is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1749,7 +1766,8 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
- @QueryParam("metricslimit") String metricsLimit) {
+ @QueryParam("metricslimit") String metricsLimit,
+ @QueryParam("fromid") String fromId) {
String url = req.getRequestURI() +
(req.getQueryString() == null ? "" :
QUERY_STRING_SEP + req.getQueryString());
@@ -1772,7 +1790,7 @@ public class TimelineReaderWebServices {
TimelineReaderWebServicesUtils.createTimelineEntityFilters(
limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters, null,
- null),
+ fromId),
TimelineReaderWebServicesUtils.createTimelineDataToRetrieve(
confsToRetrieve, metricsToRetrieve, fields, metricsLimit));
} catch (Exception e) {
@@ -1848,6 +1866,9 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
+ * @param fromId Defines the application id. If specified, retrieve the next
+ * set of applications from the given id. The set of applications
+ * retrieved is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1879,12 +1900,13 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
- @QueryParam("metricslimit") String metricsLimit) {
+ @QueryParam("metricslimit") String metricsLimit,
+ @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
- confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+ confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
}
/**
@@ -1948,6 +1970,9 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
+ * @param fromId Defines the application id. If specified, retrieve the next
+ * set of applications from the given id. The set of applications
+ * retrieved is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -1981,12 +2006,13 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
- @QueryParam("metricslimit") String metricsLimit) {
+ @QueryParam("metricslimit") String metricsLimit,
+ @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo,
isRelatedTo, infofilters, conffilters, metricfilters, eventfilters,
- confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+ confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
}
/**
@@ -2047,6 +2073,9 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
+ * @param fromId Defines the application id. If specified, retrieve the next
+ * set of applications from the given id. The set of applications
+ * retrieved is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -2077,12 +2106,13 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
- @QueryParam("metricslimit") String metricsLimit) {
+ @QueryParam("metricslimit") String metricsLimit,
+ @QueryParam("fromid") String fromId) {
return getEntities(req, res, null, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
- confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+ confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
}
/**
@@ -2144,6 +2174,9 @@ public class TimelineReaderWebServices {
* or has a value less than 1, and metrics have to be retrieved, then
* metricsLimit will be considered as 1 i.e. latest single value of
* metric(s) will be returned. (Optional query param).
+ * @param fromId Defines the application id. If specified, retrieve the next
+ * set of applications from the given id. The set of applications
+ * retrieved is inclusive of specified fromId.
*
* @return If successful, a HTTP 200(OK) response having a JSON representing
* a set of <cite>TimelineEntity</cite> instances representing apps is
@@ -2175,12 +2208,13 @@ public class TimelineReaderWebServices {
@QueryParam("confstoretrieve") String confsToRetrieve,
@QueryParam("metricstoretrieve") String metricsToRetrieve,
@QueryParam("fields") String fields,
- @QueryParam("metricslimit") String metricsLimit) {
+ @QueryParam("metricslimit") String metricsLimit,
+ @QueryParam("fromid") String fromId) {
return getEntities(req, res, clusterId, null,
TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName,
null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo,
infofilters, conffilters, metricfilters, eventfilters,
- confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null);
+ confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId);
}
/**
@@ -3108,4 +3142,4 @@ public class TimelineReaderWebServices {
" (Took " + (endTime - startTime) + " ms.)");
return results;
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org