You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2016/06/21 23:49:09 UTC
[23/50] [abbrv] hadoop git commit: YARN-5045. hbase unit tests fail
due to dependency issues. (Sangjin Lee via varunsaxena)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ccdec4a1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
new file mode 100644
index 0000000..ca80ed5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -0,0 +1,2008 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+public class TestTimelineReaderWebServicesHBaseStorage {
+ private int serverPort;
+ private TimelineReaderServer server;
+ private static HBaseTestingUtility util;
+ private static long ts = System.currentTimeMillis();
+ private static long dayTs =
+ TimelineStorageUtils.getTopOfTheDayTimestamp(ts);
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ loadData();
+ }
+
+ private static void loadData() throws Exception {
+ String cluster = "cluster1";
+ String user = "user1";
+ String flow = "flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+ Long runid1 = 1002345678920L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ String id = "application_1111111111_1111";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ entity.addConfig("cfg2", "value1");
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId("MAP_SLOT_MILLIS");
+ Map<Long, Number> metricValues =
+ ImmutableMap.of(ts - 100000, (Number)2, ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ m1 = new TimelineMetric();
+ m1.setId("MAP1_SLOT_MILLIS");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)2, ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ m1 = new TimelineMetric();
+ m1.setId("HDFS_BYTES_READ");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)31, ts - 80000, 57);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(cTime);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ TimelineEvent event11 = new TimelineEvent();
+ event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ Long expTs = 1425019501000L;
+ event11.setTimestamp(expTs);
+ entity.addEvent(event11);
+
+ te.addEntity(entity);
+
+ // write another application with same metric to this flow
+ TimelineEntities te1 = new TimelineEntities();
+ TimelineEntity entity1 = new TimelineEntity();
+ id = "application_1111111111_2222";
+ type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity1.setId(id);
+ entity1.setType(type);
+ cTime = 1425016501000L;
+ entity1.setCreatedTime(cTime);
+ entity1.addConfig("cfg1", "value1");
+ // add metrics
+ metrics.clear();
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId("MAP_SLOT_MILLIS");
+ metricValues = new HashMap<Long, Number>();
+ metricValues.put(ts - 100000, 5L);
+ metricValues.put(ts - 80000, 101L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+ entity1.addMetrics(metrics);
+ TimelineEvent event1 = new TimelineEvent();
+ event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event1.setTimestamp(cTime);
+ event1.addInfo(expKey, expVal);
+ entity1.addEvent(event1);
+ te1.addEntity(entity1);
+
+ String flow2 = "flow_name2";
+ String flowVersion2 = "CF7022C10F1454";
+ Long runid2 = 2102356789046L;
+ TimelineEntities te3 = new TimelineEntities();
+ TimelineEntity entity3 = new TimelineEntity();
+ id = "application_11111111111111_2223";
+ entity3.setId(id);
+ entity3.setType(type);
+ cTime = 1425016501037L;
+ entity3.setCreatedTime(cTime);
+ TimelineEvent event2 = new TimelineEvent();
+ event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event2.setTimestamp(cTime);
+ event2.addInfo("foo_event", "test");
+ entity3.addEvent(event2);
+ te3.addEntity(entity3);
+
+ TimelineEntities te4 = new TimelineEntities();
+ TimelineEntity entity4 = new TimelineEntity();
+ id = "application_1111111111_2224";
+ entity4.setId(id);
+ entity4.setType(type);
+ cTime = 1425016501034L;
+ entity4.setCreatedTime(cTime);
+ TimelineEvent event4 = new TimelineEvent();
+ event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event4.setTimestamp(cTime);
+ event4.addInfo("foo_event", "test");
+ entity4.addEvent(event4);
+ metrics.clear();
+ m2 = new TimelineMetric();
+ m2.setId("MAP_SLOT_MILLIS");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)5L, ts - 80000, 101L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+ entity4.addMetrics(metrics);
+ te4.addEntity(entity4);
+
+ TimelineEntities te5 = new TimelineEntities();
+ TimelineEntity entity5 = new TimelineEntity();
+ entity5.setId("entity1");
+ entity5.setType("type1");
+ entity5.setCreatedTime(1425016501034L);
+ // add some config entries
+ entity5.addConfigs(ImmutableMap.of("config_param1", "value1",
+ "config_param2", "value2", "cfg_param1", "value3"));
+ entity5.addInfo(ImmutableMap.of("info1", (Object)"cluster1",
+ "info2", 2.0, "info3", 35000, "info4", 36000));
+ metrics = new HashSet<>();
+ m1 = new TimelineMetric();
+ m1.setId("MAP_SLOT_MILLIS");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)2, ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ m1 = new TimelineMetric();
+ m1.setId("HDFS_BYTES_READ");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)31, ts - 80000, 57);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity5.addMetrics(metrics);
+ TimelineEvent event51 = new TimelineEvent();
+ event51.setId("event1");
+ event51.setTimestamp(cTime);
+ entity5.addEvent(event51);
+ TimelineEvent event52 = new TimelineEvent();
+ event52.setId("event2");
+ event52.setTimestamp(cTime);
+ entity5.addEvent(event52);
+ TimelineEvent event53 = new TimelineEvent();
+ event53.setId("event3");
+ event53.setTimestamp(cTime);
+ entity5.addEvent(event53);
+ TimelineEvent event54 = new TimelineEvent();
+ event54.setId("event4");
+ event54.setTimestamp(cTime);
+ entity5.addEvent(event54);
+ Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
+ isRelatedTo1.put("type2",
+ Sets.newHashSet("entity21","entity22","entity23","entity24"));
+ isRelatedTo1.put("type4", Sets.newHashSet("entity41","entity42"));
+ isRelatedTo1.put("type1", Sets.newHashSet("entity14","entity15"));
+ isRelatedTo1.put("type3",
+ Sets.newHashSet("entity31", "entity35", "entity32", "entity33"));
+ entity5.addIsRelatedToEntities(isRelatedTo1);
+ Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
+ relatesTo1.put("type2",
+ Sets.newHashSet("entity21","entity22","entity23","entity24"));
+ relatesTo1.put("type4", Sets.newHashSet("entity41","entity42"));
+ relatesTo1.put("type1", Sets.newHashSet("entity14","entity15"));
+ relatesTo1.put("type3",
+ Sets.newHashSet("entity31", "entity35", "entity32", "entity33"));
+ entity5.addRelatesToEntities(relatesTo1);
+ te5.addEntity(entity5);
+
+ TimelineEntity entity6 = new TimelineEntity();
+ entity6.setId("entity2");
+ entity6.setType("type1");
+ entity6.setCreatedTime(1425016501034L);
+ entity6.addConfigs(ImmutableMap.of("cfg_param3", "value1",
+ "configuration_param2", "value2", "config_param1", "value3"));
+ entity6.addInfo(ImmutableMap.of("info1", (Object)"cluster2",
+ "info2", 2.0, "info4", 35000));
+ metrics = new HashSet<>();
+ m1 = new TimelineMetric();
+ m1.setId("MAP1_SLOT_MILLIS");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)12, ts - 80000, 140);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ m1 = new TimelineMetric();
+ m1.setId("HDFS_BYTES_READ");
+ metricValues = ImmutableMap.of(ts - 100000, (Number)78, ts - 80000, 157);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ m1 = new TimelineMetric();
+ m1.setId("MAP11_SLOT_MILLIS");
+ m1.setType(Type.SINGLE_VALUE);
+ m1.addValue(ts - 100000, 122);
+ metrics.add(m1);
+ entity6.addMetrics(metrics);
+ TimelineEvent event61 = new TimelineEvent();
+ event61.setId("event1");
+ event61.setTimestamp(cTime);
+ entity6.addEvent(event61);
+ TimelineEvent event62 = new TimelineEvent();
+ event62.setId("event5");
+ event62.setTimestamp(cTime);
+ entity6.addEvent(event62);
+ TimelineEvent event63 = new TimelineEvent();
+ event63.setId("event3");
+ event63.setTimestamp(cTime);
+ entity6.addEvent(event63);
+ TimelineEvent event64 = new TimelineEvent();
+ event64.setId("event6");
+ event64.setTimestamp(cTime);
+ entity6.addEvent(event64);
+ Map<String, Set<String>> isRelatedTo2 = new HashMap<String, Set<String>>();
+ isRelatedTo2.put("type2",
+ Sets.newHashSet("entity21","entity22","entity23","entity24"));
+ isRelatedTo2.put("type5", Sets.newHashSet("entity51","entity52"));
+ isRelatedTo2.put("type6", Sets.newHashSet("entity61","entity66"));
+ isRelatedTo2.put("type3", Sets.newHashSet("entity31"));
+ entity6.addIsRelatedToEntities(isRelatedTo2);
+ Map<String, Set<String>> relatesTo2 = new HashMap<String, Set<String>>();
+ relatesTo2.put("type2",
+ Sets.newHashSet("entity21","entity22","entity23","entity24"));
+ relatesTo2.put("type5", Sets.newHashSet("entity51","entity52"));
+ relatesTo2.put("type6", Sets.newHashSet("entity61","entity66"));
+ relatesTo2.put("type3", Sets.newHashSet("entity31"));
+ entity6.addRelatesToEntities(relatesTo2);
+ te5.addEntity(entity6);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te);
+ hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1);
+ hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4);
+ hbi.write(cluster, user, flow2,
+ flowVersion2, runid2, entity3.getId(), te3);
+ hbi.write(cluster, user, flow, flowVersion, runid,
+ "application_1111111111_1111", te5);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Before
+ public void init() throws Exception {
+ try {
+ Configuration config = util.getConfiguration();
+ config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+ config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ "localhost:0");
+ config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+ config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
+ "org.apache.hadoop.yarn.server.timelineservice.storage." +
+ "HBaseTimelineReaderImpl");
+ config.setInt("hfile.format.version", 3);
+ server = new TimelineReaderServer();
+ server.init(config);
+ server.start();
+ serverPort = server.getWebServerPort();
+ } catch (Exception e) {
+ Assert.fail("Web server failed to start");
+ }
+ }
+
+ private static Client createClient() {
+ ClientConfig cfg = new DefaultClientConfig();
+ cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+ return new Client(new URLConnectionClientHandler(
+ new DummyURLConnectionFactory()), cfg);
+ }
+
+ private static ClientResponse getResponse(Client client, URI uri)
+ throws Exception {
+ ClientResponse resp =
+ client.resource(uri).accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg = new String();
+ if (resp != null) {
+ msg = resp.getClientResponseStatus().toString();
+ }
+ throw new IOException("Incorrect response from timeline reader. " +
+ "Status=" + msg);
+ }
+ return resp;
+ }
+
+ private static class DummyURLConnectionFactory
+ implements HttpURLConnectionFactory {
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
+ try {
+ return (HttpURLConnection)url.openConnection();
+ } catch (UndeclaredThrowableException e) {
+ throw new IOException(e.getCause());
+ }
+ }
+ }
+
+ private static TimelineEntity newEntity(String type, String id) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setIdentifier(new TimelineEntity.Identifier(type, id));
+ return entity;
+ }
+
+ private static TimelineMetric newMetric(TimelineMetric.Type type,
+ String id, long ts, Number value) {
+ TimelineMetric metric = new TimelineMetric(type);
+ metric.setId(id);
+ metric.addValue(ts, value);
+ return metric;
+ }
+
+ private static boolean verifyMetricValues(Map<Long, Number> m1,
+ Map<Long, Number> m2) {
+ for (Map.Entry<Long, Number> entry : m1.entrySet()) {
+ if (!m2.containsKey(entry.getKey())) {
+ return false;
+ }
+ if (m2.get(entry.getKey()).equals(entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean verifyMetrics(
+ TimelineMetric m, TimelineMetric... metrics) {
+ for (TimelineMetric metric : metrics) {
+ if (!metric.equals(m)) {
+ continue;
+ }
+ if (!verifyMetricValues(metric.getValues(), m.getValues())) {
+ continue;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private static void verifyHttpResponse(Client client, URI uri,
+ Status status) {
+ ClientResponse resp =
+ client.resource(uri).accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertNotNull(resp);
+ assertTrue("Response from server should have been " + status,
+ resp.getClientResponseStatus().equals(status));
+ }
+
+ @Test
+ public void testGetFlowRun() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
+ "1002345678919");
+ ClientResponse resp = getResponse(client, uri);
+ FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entity);
+ assertEquals("user1@flow_name/1002345678919", entity.getId());
+ assertEquals(3, entity.getMetrics().size());
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "HDFS_BYTES_READ", ts - 80000, 57L);
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "MAP_SLOT_MILLIS", ts - 80000, 141L);
+ TimelineMetric m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "MAP1_SLOT_MILLIS", ts - 80000, 40L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2, m3));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/users/user1/flows/flow_name/runs/1002345678919");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(FlowRunEntity.class);
+ assertNotNull(entity);
+ assertEquals("user1@flow_name/1002345678919", entity.getId());
+ assertEquals(3, entity.getMetrics().size());
+ m1 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "HDFS_BYTES_READ", ts - 80000, 57L);
+ m2 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "MAP_SLOT_MILLIS", ts - 80000, 141L);
+ m3 = newMetric(TimelineMetric.Type.SINGLE_VALUE,
+ "MAP1_SLOT_MILLIS", ts - 80000, 40L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2, m3));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRuns() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowRunEntity> entities =
+ resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (FlowRunEntity entity : entities) {
+ assertTrue("Id, run id or start time does not match.",
+ ((entity.getId().equals("user1@flow_name/1002345678919")) &&
+ (entity.getRunId() == 1002345678919L) &&
+ (entity.getStartTime() == 1425016501000L)) ||
+ ((entity.getId().equals("user1@flow_name/1002345678920")) &&
+ (entity.getRunId() == 1002345678920L) &&
+ (entity.getStartTime() == 1425016501034L)));
+ assertEquals(0, entity.getMetrics().size());
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "clusters/cluster1/users/user1/flows/flow_name/runs?limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (FlowRunEntity entity : entities) {
+ assertTrue("Id, run id or start time does not match.",
+ entity.getId().equals("user1@flow_name/1002345678920") &&
+ entity.getRunId() == 1002345678920L &&
+ entity.getStartTime() == 1425016501034L);
+ assertEquals(0, entity.getMetrics().size());
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+ "createdtimestart=1425016501030");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (FlowRunEntity entity : entities) {
+ assertTrue("Id, run id or start time does not match.",
+ entity.getId().equals("user1@flow_name/1002345678920") &&
+ entity.getRunId() == 1002345678920L &&
+ entity.getStartTime() == 1425016501034L);
+ assertEquals(0, entity.getMetrics().size());
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+ "createdtimestart=1425016500999&createdtimeend=1425016501035");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (FlowRunEntity entity : entities) {
+ assertTrue("Id, run id or start time does not match.",
+ ((entity.getId().equals("user1@flow_name/1002345678919")) &&
+ (entity.getRunId() == 1002345678919L) &&
+ (entity.getStartTime() == 1425016501000L)) ||
+ ((entity.getId().equals("user1@flow_name/1002345678920")) &&
+ (entity.getRunId() == 1002345678920L) &&
+ (entity.getStartTime() == 1425016501034L)));
+ assertEquals(0, entity.getMetrics().size());
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+ "createdtimeend=1425016501030");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (FlowRunEntity entity : entities) {
+ assertTrue("Id, run id or start time does not match.",
+ entity.getId().equals("user1@flow_name/1002345678919") &&
+ entity.getRunId() == 1002345678919L &&
+ entity.getStartTime() == 1425016501000L);
+ assertEquals(0, entity.getMetrics().size());
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+ "fields=metrics");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (FlowRunEntity entity : entities) {
+ assertTrue("Id, run id or start time does not match.",
+ ((entity.getId().equals("user1@flow_name/1002345678919")) &&
+ (entity.getRunId() == 1002345678919L) &&
+ (entity.getStartTime() == 1425016501000L) &&
+ (entity.getMetrics().size() == 3)) ||
+ ((entity.getId().equals("user1@flow_name/1002345678920")) &&
+ (entity.getRunId() == 1002345678920L) &&
+ (entity.getStartTime() == 1425016501034L) &&
+ (entity.getMetrics().size() == 1)));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+
+ @Test
+ public void testGetFlowRunsMetricsToRetrieve() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+ "metricstoretrieve=MAP_,HDFS_");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowRunEntity> entities =
+ resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ int metricCnt = 0;
+ for (FlowRunEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP_") ||
+ metric.getId().startsWith("HDFS_"));
+ }
+ }
+ assertEquals(3, metricCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs?" +
+ "metricstoretrieve=!(MAP_,HDFS_)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ metricCnt = 0;
+ for (FlowRunEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP1_"));
+ }
+ }
+ assertEquals(1, metricCnt);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesByUID() throws Exception {
+ Client client = createClient();
+ try {
+ // Query all flows.
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flows");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowActivityEntity> flowEntities =
+ resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(flowEntities);
+ assertEquals(2, flowEntities.size());
+ List<String> listFlowUIDs = new ArrayList<String>();
+ for (FlowActivityEntity entity : flowEntities) {
+ String flowUID =
+ (String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
+ listFlowUIDs.add(flowUID);
+ assertEquals(TimelineUIDConverter.FLOW_UID.encodeUID(
+ new TimelineReaderContext(entity.getCluster(), entity.getUser(),
+ entity.getFlowName(), null, null, null, null)), flowUID);
+ assertTrue((entity.getId().endsWith("@flow_name") &&
+ entity.getFlowRuns().size() == 2) ||
+ (entity.getId().endsWith("@flow_name2") &&
+ entity.getFlowRuns().size() == 1));
+ }
+
+ // Query flowruns based on UID returned in query above.
+ List<String> listFlowRunUIDs = new ArrayList<String>();
+ for (String flowUID : listFlowUIDs) {
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flow-uid/" + flowUID + "/runs");
+ resp = getResponse(client, uri);
+ Set<FlowRunEntity> frEntities =
+ resp.getEntity(new GenericType<Set<FlowRunEntity>>(){});
+ assertNotNull(frEntities);
+ for (FlowRunEntity entity : frEntities) {
+ String flowRunUID =
+ (String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
+ listFlowRunUIDs.add(flowRunUID);
+ assertEquals(TimelineUIDConverter.FLOWRUN_UID.encodeUID(
+ new TimelineReaderContext("cluster1", entity.getUser(),
+ entity.getName(), entity.getRunId(), null, null, null)),
+ flowRunUID);
+ }
+ }
+ assertEquals(3, listFlowRunUIDs.size());
+
+ // Query single flowrun based on UIDs' returned in query to get flowruns.
+ for (String flowRunUID : listFlowRunUIDs) {
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/run-uid/" + flowRunUID);
+ resp = getResponse(client, uri);
+ FlowRunEntity entity = resp.getEntity(FlowRunEntity.class);
+ assertNotNull(entity);
+ }
+
+ // Query apps based on UIDs' returned in query to get flowruns.
+ List<String> listAppUIDs = new ArrayList<String>();
+ for (String flowRunUID : listFlowRunUIDs) {
+ TimelineReaderContext context =
+ TimelineUIDConverter.FLOWRUN_UID.decodeUID(flowRunUID);
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/run-uid/" + flowRunUID + "/apps");
+ resp = getResponse(client, uri);
+ Set<TimelineEntity> appEntities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(appEntities);
+ for (TimelineEntity entity : appEntities) {
+ String appUID =
+ (String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
+ listAppUIDs.add(appUID);
+ assertEquals(TimelineUIDConverter.APPLICATION_UID.encodeUID(
+ new TimelineReaderContext(context.getClusterId(),
+ context.getUserId(), context.getFlowName(),
+ context.getFlowRunId(), entity.getId(), null, null)), appUID);
+ }
+ }
+ assertEquals(4, listAppUIDs.size());
+
+ // Query single app based on UIDs' returned in query to get apps.
+ for (String appUID : listAppUIDs) {
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app-uid/" + appUID);
+ resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ }
+
+ // Query entities based on UIDs' returned in query to get apps and
+ // a specific entity type(in this case type1).
+ List<String> listEntityUIDs = new ArrayList<String>();
+ for (String appUID : listAppUIDs) {
+ TimelineReaderContext context =
+ TimelineUIDConverter.APPLICATION_UID.decodeUID(appUID);
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app-uid/" + appUID + "/entities/type1");
+ resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ for (TimelineEntity entity : entities) {
+ String entityUID =
+ (String)entity.getInfo().get(TimelineReaderManager.UID_KEY);
+ listEntityUIDs.add(entityUID);
+ assertEquals(TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(
+ new TimelineReaderContext(context.getClusterId(),
+ context.getUserId(), context.getFlowName(),
+ context.getFlowRunId(), context.getAppId(), "type1",
+ entity.getId())), entityUID);
+ }
+ }
+ assertEquals(2, listEntityUIDs.size());
+
+ // Query single entity based on UIDs' returned in query to get entities.
+ for (String entityUID : listEntityUIDs) {
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/entity-uid/" + entityUID);
+ resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flow-uid/dummy:flow/runs");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/run-uid/dummy:flowrun");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ // Run Id is not a numerical value.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/run-uid/some:dummy:flow:123v456");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/run-uid/dummy:flowrun/apps");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app-uid/dummy:app");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/app-uid/dummy:app/entities/type1");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/entity-uid/dummy:entity");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testUIDQueryWithAndWithoutFlowContextInfo() throws Exception {
+ Client client = createClient();
+ try {
+ String appUIDWithFlowInfo =
+ "cluster1!user1!flow_name!1002345678919!application_1111111111_1111";
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+
+ "timeline/app-uid/" + appUIDWithFlowInfo);
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity appEntity1 = resp.getEntity(TimelineEntity.class);
+ assertNotNull(appEntity1);
+ assertEquals(
+ TimelineEntityType.YARN_APPLICATION.toString(), appEntity1.getType());
+ assertEquals("application_1111111111_1111", appEntity1.getId());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "app-uid/" + appUIDWithFlowInfo + "/entities/type1");
+ resp = getResponse(client, uri);
+ Set<TimelineEntity> entities1 =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities1);
+ assertEquals(2, entities1.size());
+ for (TimelineEntity entity : entities1) {
+ assertNotNull(entity.getInfo());
+ assertEquals(1, entity.getInfo().size());
+ String uid =
+ (String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
+ assertNotNull(uid);
+ assertTrue(uid.equals(appUIDWithFlowInfo + "!type1!entity1") ||
+ uid.equals(appUIDWithFlowInfo + "!type1!entity2"));
+ }
+
+ String appUIDWithoutFlowInfo = "cluster1!application_1111111111_1111";
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
+ "app-uid/" + appUIDWithoutFlowInfo);
+ resp = getResponse(client, uri);;
+ TimelineEntity appEntity2 = resp.getEntity(TimelineEntity.class);
+ assertNotNull(appEntity2);
+ assertEquals(
+ TimelineEntityType.YARN_APPLICATION.toString(), appEntity2.getType());
+ assertEquals("application_1111111111_1111", appEntity2.getId());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "app-uid/" + appUIDWithoutFlowInfo + "/entities/type1");
+ resp = getResponse(client, uri);
+ Set<TimelineEntity> entities2 =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities2);
+ assertEquals(2, entities2.size());
+ for (TimelineEntity entity : entities2) {
+ assertNotNull(entity.getInfo());
+ assertEquals(1, entity.getInfo().size());
+ String uid =
+ (String) entity.getInfo().get(TimelineReaderManager.UID_KEY);
+ assertNotNull(uid);
+ assertTrue(uid.equals(appUIDWithoutFlowInfo + "!type1!entity1") ||
+ uid.equals(appUIDWithoutFlowInfo + "!type1!entity2"));
+ }
+
+ String entityUIDWithFlowInfo = appUIDWithFlowInfo + "!type1!entity1";
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
+ "entity-uid/" + entityUIDWithFlowInfo);
+ resp = getResponse(client, uri);;
+ TimelineEntity singleEntity1 = resp.getEntity(TimelineEntity.class);
+ assertNotNull(singleEntity1);
+ assertEquals("type1", singleEntity1.getType());
+ assertEquals("entity1", singleEntity1.getId());
+
+ String entityUIDWithoutFlowInfo =
+ appUIDWithoutFlowInfo + "!type1!entity1";
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/"+
+ "entity-uid/" + entityUIDWithoutFlowInfo);
+ resp = getResponse(client, uri);;
+ TimelineEntity singleEntity2 = resp.getEntity(TimelineEntity.class);
+ assertNotNull(singleEntity2);
+ assertEquals("type1", singleEntity2.getType());
+ assertEquals("entity1", singleEntity2.getId());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testUIDNotProperlyEscaped() throws Exception {
+ Client client = createClient();
+ try {
+ String appUID =
+ "cluster1!user*1!flow_name!1002345678919!application_1111111111_1111";
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/"+
+ "timeline/app-uid/" + appUID);
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlows() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowActivityEntity> entities =
+ resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (FlowActivityEntity entity : entities) {
+ assertTrue((entity.getId().endsWith("@flow_name") &&
+ entity.getFlowRuns().size() == 2) ||
+ (entity.getId().endsWith("@flow_name2") &&
+ entity.getFlowRuns().size() == 1));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/flows/");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+
+ long firstFlowActivity =
+ TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
+
+ DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange="
+ + fmt.format(firstFlowActivity) + "-"
+ + fmt.format(dayTs));
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (FlowActivityEntity entity : entities) {
+ assertTrue((entity.getId().endsWith("@flow_name") &&
+ entity.getFlowRuns().size() == 2) ||
+ (entity.getId().endsWith("@flow_name2") &&
+ entity.getFlowRuns().size() == 1));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=" +
+ fmt.format(dayTs + (4*86400000L)));
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=-" +
+ fmt.format(dayTs));
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=" +
+ fmt.format(firstFlowActivity) + "-");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=20150711:20150714");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=20150714-20150711");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=2015071129-20150712");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/flows?daterange=20150711-2015071243");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetApp() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111?" +
+ "userid=user1&fields=ALL&flowname=flow_name&flowrunid=1002345678919");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("application_1111111111_1111", entity.getId());
+ assertEquals(3, entity.getMetrics().size());
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "HDFS_BYTES_READ", ts - 100000, 31L);
+ m1.addValue(ts - 80000, 57L);
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP_SLOT_MILLIS", ts - 100000, 2L);
+ m2.addValue(ts - 80000, 40L);
+ TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP1_SLOT_MILLIS", ts - 100000, 2L);
+ m3.addValue(ts - 80000, 40L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2, m3));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/apps/application_1111111111_2222?userid=user1" +
+ "&fields=metrics&flowname=flow_name&flowrunid=1002345678919");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("application_1111111111_2222", entity.getId());
+ assertEquals(1, entity.getMetrics().size());
+ TimelineMetric m4 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP_SLOT_MILLIS", ts - 100000, 5L);
+ m4.addValue(ts - 80000, 101L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m4));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetAppWithoutFlowInfo() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111?" +
+ "fields=ALL");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("application_1111111111_1111", entity.getId());
+ assertEquals(3, entity.getMetrics().size());
+ TimelineMetric m1 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "HDFS_BYTES_READ", ts - 100000, 31L);
+ m1.addValue(ts - 80000, 57L);
+ TimelineMetric m2 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP_SLOT_MILLIS", ts - 100000, 2L);
+ m2.addValue(ts - 80000, 40L);
+ TimelineMetric m3 = newMetric(TimelineMetric.Type.TIME_SERIES,
+ "MAP1_SLOT_MILLIS", ts - 100000, 2L);
+ m3.addValue(ts - 80000, 40L);
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(verifyMetrics(metric, m1, m2, m3));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntityWithoutFlowInfo() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1/entity1");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("entity1", entity.getId());
+ assertEquals("type1", entity.getType());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesWithoutFlowInfo() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ /**
+ * Tests if specific configs and metrics are retrieve for getEntities call.
+ */
+ @Test
+ public void testGetEntitiesDataToRetrieve() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?confstoretrieve=cfg_");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ int cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ for (String configKey : entity.getConfigs().keySet()) {
+ assertTrue(configKey.startsWith("cfg_"));
+ }
+ }
+ assertEquals(2, cfgCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?confstoretrieve=cfg_,config_");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ for (String configKey : entity.getConfigs().keySet()) {
+ assertTrue(configKey.startsWith("cfg_") ||
+ configKey.startsWith("config_"));
+ }
+ }
+ assertEquals(5, cfgCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?confstoretrieve=!(cfg_,config_)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ for (String configKey : entity.getConfigs().keySet()) {
+ assertTrue(configKey.startsWith("configuration_"));
+ }
+ }
+ assertEquals(1, cfgCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricstoretrieve=MAP_");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ int metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP_"));
+ }
+ }
+ assertEquals(1, metricCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricstoretrieve=MAP1_,HDFS_");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP1_") ||
+ metric.getId().startsWith("HDFS_"));
+ }
+ }
+ assertEquals(3, metricCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricstoretrieve=!(MAP1_,HDFS_)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP_") ||
+ metric.getId().startsWith("MAP11_"));
+ }
+ }
+ assertEquals(2, metricCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricstoretrieve=!(MAP1_,HDFS_");
+ verifyHttpResponse(client, uri, Status.BAD_REQUEST);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesConfigFilters() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=config_param1%20eq%20value1%20OR%20" +
+ "config_param1%20eq%20value3");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=config_param1%20eq%20value1%20AND" +
+ "%20configuration_param2%20eq%20value2");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+
+ // conffilters=(config_param1 eq value1 AND configuration_param2 eq
+ // value2) OR (config_param1 eq value3 AND cfg_param3 eq value1)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" +
+ "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" +
+ "%20value3%20AND%20cfg_param3%20eq%20value1)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ int cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ assertEquals(0, cfgCnt);
+
+ // conffilters=(config_param1 eq value1 AND configuration_param2 eq
+ // value2) OR (config_param1 eq value3 AND cfg_param3 eq value1)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" +
+ "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" +
+ "%20value3%20AND%20cfg_param3%20eq%20value1)&fields=CONFIGS");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ assertEquals(3, cfgCnt);
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=(config_param1%20eq%20value1%20AND" +
+ "%20configuration_param2%20eq%20value2)%20OR%20(config_param1%20eq" +
+ "%20value3%20AND%20cfg_param3%20eq%20value1)&confstoretrieve=cfg_," +
+ "configuration_");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ assertTrue(entity.getId().equals("entity2"));
+ for (String configKey : entity.getConfigs().keySet()) {
+ assertTrue(configKey.startsWith("cfg_") ||
+ configKey.startsWith("configuration_"));
+ }
+ }
+ assertEquals(2, cfgCnt);
+
+ // Test for behavior when compare op is ne(not equals) vs ene
+ // (exists and not equals). configuration_param2 does not exist for
+ // entity1. For ne, both entity1 and entity2 will be returned. For ene,
+ // only entity2 will be returned as we are checking for existence too.
+ // conffilters=configuration_param2 ne value3
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=configuration_param2%20ne%20value3");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ // conffilters=configuration_param2 ene value3
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?conffilters=configuration_param2%20ene%20value3");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesInfoFilters() throws Exception {
+ Client client = createClient();
+ try {
+ // infofilters=info1 eq cluster1 OR info1 eq cluster2
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=info1%20eq%20cluster1%20OR%20info1%20eq" +
+ "%20cluster2");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ // infofilters=info1 eq cluster1 AND info4 eq 35000
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=info1%20eq%20cluster1%20AND%20info4%20" +
+ "eq%2035000");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+
+ // infofilters=info4 eq 35000 OR info4 eq 36000
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=info4%20eq%2035000%20OR%20info4%20eq" +
+ "%2036000");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ // infofilters=(info1 eq cluster1 AND info4 eq 35000) OR
+ // (info1 eq cluster2 AND info2 eq 2.0)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" +
+ "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%202.0)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ int infoCnt = 0;
+ for (TimelineEntity entity : entities) {
+ infoCnt += entity.getInfo().size();
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ // Includes UID in info field even if fields not specified as INFO.
+ assertEquals(1, infoCnt);
+
+ // infofilters=(info1 eq cluster1 AND info4 eq 35000) OR
+ // (info1 eq cluster2 AND info2 eq 2.0)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=(info1%20eq%20cluster1%20AND%20info4%20" +
+ "eq%2035000)%20OR%20(info1%20eq%20cluster2%20AND%20info2%20eq%20" +
+ "2.0)&fields=INFO");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ infoCnt = 0;
+ for (TimelineEntity entity : entities) {
+ infoCnt += entity.getInfo().size();
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ // Includes UID in info field.
+ assertEquals(4, infoCnt);
+
+ // Test for behavior when compare op is ne(not equals) vs ene
+ // (exists and not equals). info3 does not exist for entity2. For ne,
+ // both entity1 and entity2 will be returned. For ene, only entity2 will
+ // be returned as we are checking for existence too.
+ // infofilters=info3 ne 39000
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=info3%20ne%2039000");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ // infofilters=info3 ene 39000
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?infofilters=info3%20ene%2039000");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesMetricFilters() throws Exception {
+ Client client = createClient();
+ try {
+ // metricfilters=HDFS_BYTES_READ lt 60 OR HDFS_BYTES_READ eq 157
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20OR%20" +
+ "HDFS_BYTES_READ%20eq%20157");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ // metricfilters=HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=HDFS_BYTES_READ%20lt%2060%20AND%20" +
+ "MAP_SLOT_MILLIS%20gt%2040");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+
+ // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR
+ // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" +
+ "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" +
+ "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ int metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ assertEquals(0, metricCnt);
+
+ // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR
+ // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" +
+ "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" +
+ "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&fields=METRICS");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ assertEquals(3, metricCnt);
+
+ // metricfilters=(HDFS_BYTES_READ lt 60 AND MAP_SLOT_MILLIS gt 40) OR
+ // (MAP1_SLOT_MILLIS ge 140 AND MAP11_SLOT_MILLIS le 122)
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=(HDFS_BYTES_READ%20lt%2060%20AND%20" +
+ "MAP_SLOT_MILLIS%20gt%2040)%20OR%20(MAP1_SLOT_MILLIS%20ge" +
+ "%20140%20AND%20MAP11_SLOT_MILLIS%20le%20122)&metricstoretrieve=" +
+ "!(HDFS)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ assertTrue(entity.getId().equals("entity2"));
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP1"));
+ }
+ }
+ assertEquals(2, metricCnt);
+
+ // Test for behavior when compare op is ne(not equals) vs ene
+ // (exists and not equals). MAP11_SLOT_MILLIS does not exist for
+ // entity1. For ne, both entity1 and entity2 will be returned. For ene,
+ // only entity2 will be returned as we are checking for existence too.
+ // metricfilters=MAP11_SLOT_MILLIS ne 100
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ne%20100");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ // metricfilters=MAP11_SLOT_MILLIS ene 100
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?metricfilters=MAP11_SLOT_MILLIS%20ene%20100");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity2"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesEventFilters() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?eventfilters=event1,event3");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?eventfilters=!(event1,event3)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+
+ // eventfilters=!(event1,event3) OR event5,event6
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?eventfilters=!(event1,event3)%20OR%20event5,event6");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity2"));
+ }
+
+ // eventfilters=(!(event1,event3) OR event5,event6) OR
+ // (event1,event2 AND (event3,event4))
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?eventfilters=(!(event1,event3)%20OR%20event5," +
+ "event6)%20OR%20(event1,event2%20AND%20(event3,event4))");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetEntitiesRelationFilters() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1?isrelatedto=type3:entity31,type2:entity21:entity22");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "clusters/cluster1/apps/application_1111111111_1111/entities/type1" +
+ "?isrelatedto=!(type3:entity31,type2:entity21:entity22)");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+
+ // isrelatedto=!(type3:entity31,type2:entity21:entity22)OR type5:entity51,
+ // type6:entity61:entity66
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "clusters/cluster1/apps/application_1111111111_1111/entities/type1" +
+ "?isrelatedto=!(type3:entity31,type2:entity21:entity22)%20OR%20" +
+ "type5:entity51,type6:entity61:entity66");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity2"));
+ }
+
+ // isrelatedto=(!(type3:entity31,type2:entity21:entity22)OR type5:
+ // entity51,type6:entity61:entity66) OR (type1:entity14,type2:entity21:
+ // entity22 AND (type3:entity32:entity35,type4:entity42))
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "clusters/cluster1/apps/application_1111111111_1111/entities/type1" +
+ "?isrelatedto=(!(type3:entity31,type2:entity21:entity22)%20OR%20" +
+ "type5:entity51,type6:entity61:entity66)%20OR%20(type1:entity14," +
+ "type2:entity21:entity22%20AND%20(type3:entity32:entity35,"+
+ "type4:entity42))");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+
+ // relatesto=!(type3:entity31,type2:entity21:entity22)OR type5:entity51,
+ // type6:entity61:entity66
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "clusters/cluster1/apps/application_1111111111_1111/entities/type1" +
+ "?relatesto=!%20(type3:entity31,type2:entity21:entity22%20)%20OR%20" +
+ "type5:entity51,type6:entity61:entity66");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity2"));
+ }
+
+ // relatesto=(!(type3:entity31,type2:entity21:entity22)OR type5:entity51,
+ // type6:entity61:entity66) OR (type1:entity14,type2:entity21:entity22 AND
+ // (type3:entity32:entity35 , type4:entity42))
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/timeline/" +
+ "clusters/cluster1/apps/application_1111111111_1111/entities/type1" +
+ "?relatesto=(!(%20type3:entity31,type2:entity21:entity22)%20OR%20" +
+ "type5:entity51,type6:entity61:entity66%20)%20OR%20(type1:entity14," +
+ "type2:entity21:entity22%20AND%20(type3:entity32:entity35%20,%20"+
+ "type4:entity42))");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue(entity.getId().equals("entity1") ||
+ entity.getId().equals("entity2"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ /**
+ * Tests if specific configs and metrics are retrieve for getEntity call.
+ */
+ @Test
+ public void testGetEntityDataToRetrieve() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1/entity2?confstoretrieve=cfg_,configuration_");
+ ClientResponse resp = getResponse(client, uri);
+ TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("entity2", entity.getId());
+ assertEquals("type1", entity.getType());
+ assertEquals(2, entity.getConfigs().size());
+ for (String configKey : entity.getConfigs().keySet()) {
+ assertTrue(configKey.startsWith("configuration_") ||
+ configKey.startsWith("cfg_"));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1/entity2?confstoretrieve=!(cfg_,configuration_)");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("entity2", entity.getId());
+ assertEquals("type1", entity.getType());
+ assertEquals(1, entity.getConfigs().size());
+ for (String configKey : entity.getConfigs().keySet()) {
+ assertTrue(configKey.startsWith("config_"));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1/entity2?metricstoretrieve=MAP1_,HDFS_");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("entity2", entity.getId());
+ assertEquals("type1", entity.getType());
+ assertEquals(2, entity.getMetrics().size());
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP1_") ||
+ metric.getId().startsWith("HDFS_"));
+ }
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1111/" +
+ "entities/type1/entity2?metricstoretrieve=!(MAP1_,HDFS_)");
+ resp = getResponse(client, uri);
+ entity = resp.getEntity(TimelineEntity.class);
+ assertNotNull(entity);
+ assertEquals("entity2", entity.getId());
+ assertEquals("type1", entity.getType());
+ assertEquals(1, entity.getMetrics().size());
+ for (TimelineMetric metric : entity.getMetrics()) {
+ assertTrue(metric.getId().startsWith("MAP11_"));
+ }
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRunApps() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
+ "1002345678919/apps?fields=ALL");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue("Unexpected app in result",
+ (entity.getId().equals("application_1111111111_1111") &&
+ entity.getMetrics().size() == 3) ||
+ (entity.getId().equals("application_1111111111_2222") &&
+ entity.getMetrics().size() == 1));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/users/user1/flows/flow_name/runs/1002345678919/apps");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/users/user1/flows/flow_name/runs/1002345678919/" +
+ "apps?limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowApps() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
+ "fields=ALL");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(3, entities.size());
+ for (TimelineEntity entity : entities) {
+ assertTrue("Unexpected app in result",
+ (entity.getId().equals("application_1111111111_1111") &&
+ entity.getMetrics().size() == 3) ||
+ (entity.getId().equals("application_1111111111_2222") &&
+ entity.getMetrics().size() == 1) ||
+ (entity.getId().equals("application_1111111111_2224") &&
+ entity.getMetrics().size() == 1));
+ }
+
+ // Query without specifying cluster ID.
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/users/user1/flows/flow_name/apps");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(3, entities.size());
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/users/user1/flows/flow_name/apps?limit=1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowAppsFilters() throws Exception {
+ Client client = createClient();
+ try {
+ String entityType = TimelineEntityType.YARN_APPLICATION.toString();
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
+ "eventfilters=" + ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertTrue("Unexpected app in result", entities.contains(
+ newEntity(entityType, "application_1111111111_1111")));
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
+ "metricfilters=HDFS_BYTES_READ%20ge%200");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertTrue("Unexpected app in result", entities.contains(
+ newEntity(entityType, "application_1111111111_1111")));
+
+ uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/apps?" +
+ "conffilters=cfg1%20eq%20value1");
+ resp = getResponse(client, uri);
+ entities = resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertNotNull(entities);
+ assertEquals(1, entities.size());
+ assertTrue("Unexpected app in result", entities.contains(
+ newEntity(entityType, "application_1111111111_2222")));
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRunNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/users/user1/flows/flow_name/runs/" +
+ "1002345678929");
+ verifyHttpResponse(client, uri, Status.NOT_FOUND);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowsNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster2/flows");
+ ClientResponse resp = getResponse(client, uri);
+ Set<FlowActivityEntity> entities =
+ resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetAppNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster1/apps/application_1111111111_1378");
+ verifyHttpResponse(client, uri, Status.NOT_FOUND);
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowRunAppsNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster2/users/user1/flows/flow_name/runs/" +
+ "1002345678919/apps");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @Test
+ public void testGetFlowAppsNotPresent() throws Exception {
+ Client client = createClient();
+ try {
+ URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
+ "timeline/clusters/cluster2/users/user1/flows/flow_name55/apps");
+ ClientResponse resp = getResponse(client, uri);
+ Set<TimelineEntity> entities =
+ resp.getEntity(new GenericType<Set<TimelineEntity>>(){});
+ assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
+ assertNotNull(entities);
+ assertEquals(0, entities.size());
+ } finally {
+ client.destroy();
+ }
+ }
+
+ @After
+ public void stop() throws Exception {
+ if (server != null) {
+ server.stop();
+ server = null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org