You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/12/02 06:48:31 UTC
[1/2] hadoop git commit: YARN-3862. Support for fetching specific
configs and metrics based on prefixes (Varun Saxena via sjlee)
Repository: hadoop
Updated Branches:
refs/heads/feature-YARN-2928 7c861634e -> cdb96df97
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index 38c0f3f..21ddcc2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -83,6 +83,18 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
return columnPrefix;
}
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
public byte[] getColumnPrefixBytes() {
return columnPrefixBytes.clone();
}
@@ -112,8 +124,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier = ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
@@ -233,8 +244,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier = ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index eb055fe..e3bb52d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -89,8 +89,16 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
return columnPrefixBytes.clone();
}
- public byte[] getColumnPrefixBytes(String qualifier) {
- return ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
}
public byte[] getColumnFamilyBytes() {
@@ -121,8 +129,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier = ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
@@ -149,8 +156,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier = ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
attributes, this.aggOp);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
index 4e23e49..e864d61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java
@@ -266,7 +266,7 @@ public class TestFileSystemTimelineReaderImpl {
// only the id, created and modified time
TimelineEntity result =
reader.getEntity("user1", "cluster1", "flow1", 1L, "app1",
- "app", "id_1", null);
+ "app", "id_1", null, null, null);
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString());
@@ -281,7 +281,7 @@ public class TestFileSystemTimelineReaderImpl {
// Cluster and AppId should be enough to get an entity.
TimelineEntity result =
reader.getEntity(null, "cluster1", null, null, "app1",
- "app", "id_1", null);
+ "app", "id_1", null, null, null);
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString());
@@ -298,7 +298,7 @@ public class TestFileSystemTimelineReaderImpl {
// in app flow mapping csv has commas.
TimelineEntity result =
reader.getEntity(null, "cluster1", null, null, "app2",
- "app", "id_5", null);
+ "app", "id_5", null, null, null);
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_5")).toString(),
result.getIdentifier().toString());
@@ -311,7 +311,7 @@ public class TestFileSystemTimelineReaderImpl {
// Specified fields in addition to default view will be returned.
TimelineEntity result =
reader.getEntity("user1", "cluster1", "flow1", 1L,
- "app1", "app", "id_1",
+ "app1", "app", "id_1", null, null,
EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS));
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
@@ -329,8 +329,8 @@ public class TestFileSystemTimelineReaderImpl {
public void testGetEntityAllFields() throws Exception {
// All fields of TimelineEntity will be returned.
TimelineEntity result =
- reader.getEntity("user1", "cluster1", "flow1", 1L,
- "app1", "app", "id_1", EnumSet.of(Field.ALL));
+ reader.getEntity("user1", "cluster1", "flow1", 1L, "app1", "app",
+ "id_1", null, null, EnumSet.of(Field.ALL));
Assert.assertEquals(
(new TimelineEntity.Identifier("app", "id_1")).toString(),
result.getIdentifier().toString());
@@ -347,7 +347,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, null, null,
- null, null);
+ null, null, null, null);
// All 3 entities will be returned
Assert.assertEquals(4, result.size());
}
@@ -357,7 +357,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
2L, null, null, null, null, null, null, null, null, null,
- null, null);
+ null, null, null, null);
Assert.assertEquals(2, result.size());
// Needs to be rewritten once hashcode and equals for
// TimelineEntity is implemented
@@ -371,7 +371,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
3L, null, null, null, null, null, null, null, null, null,
- null, null);
+ null, null, null, null);
// Even though 2 entities out of 4 have same created time, one entity
// is left out due to limit
Assert.assertEquals(3, result.size());
@@ -383,7 +383,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, 1425016502030L, 1425016502060L, null, null, null, null, null,
- null, null, null, null);
+ null, null, null, null, null, null);
Assert.assertEquals(1, result.size());
// Only one entity with ID id_4 should be returned.
for (TimelineEntity entity : result) {
@@ -396,7 +396,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, 1425016502010L, null, null, null, null, null, null,
- null, null, null);
+ null, null, null, null, null);
Assert.assertEquals(3, result.size());
for (TimelineEntity entity : result) {
if (entity.getId().equals("id_4")) {
@@ -408,7 +408,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, 1425016502010L, null, null, null, null, null, null, null,
- null, null, null);
+ null, null, null, null, null);
Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_4")) {
@@ -420,7 +420,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, 1425016502090L, 1425016503020L, null, null, null,
- null, null, null, null);
+ null, null, null, null, null, null);
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_4 should be returned.
for (TimelineEntity entity : result) {
@@ -433,7 +433,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, 1425016502090L, null, null, null, null,
- null, null, null);
+ null, null, null, null, null);
Assert.assertEquals(2, result.size());
for (TimelineEntity entity : result) {
if (entity.getId().equals("id_1") || entity.getId().equals("id_4")) {
@@ -445,7 +445,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, 1425016503005L, null, null, null, null, null,
- null, null, null);
+ null, null, null, null, null);
Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_4")) {
@@ -462,7 +462,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, infoFilters, null, null,
- null, null);
+ null, null, null, null);
Assert.assertEquals(1, result.size());
// Only one entity with ID id_3 should be returned.
for (TimelineEntity entity : result) {
@@ -478,7 +478,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, configFilters, null,
- null, null);
+ null, null, null, null);
Assert.assertEquals(2, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
@@ -493,7 +493,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, null, null,
- eventFilters, null);
+ eventFilters, null, null, null);
Assert.assertEquals(1, result.size());
for (TimelineEntity entity : result) {
if (!entity.getId().equals("id_3")) {
@@ -507,7 +507,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, null, null, null, metricFilters,
- null, null);
+ null, null, null, null);
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_2 should be returned.
for (TimelineEntity entity : result) {
@@ -527,7 +527,7 @@ public class TestFileSystemTimelineReaderImpl {
Set<TimelineEntity> result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, relatesTo, null, null, null, null,
- null, null);
+ null, null, null, null);
Assert.assertEquals(1, result.size());
// Only one entity with ID id_1 should be returned.
for (TimelineEntity entity : result) {
@@ -544,7 +544,7 @@ public class TestFileSystemTimelineReaderImpl {
result =
reader.getEntities("user1", "cluster1", "flow1", 1L, "app1", "app",
null, null, null, null, null, null, isRelatedTo, null, null, null,
- null, null);
+ null, null, null, null);
Assert.assertEquals(2, result.size());
// Two entities with IDs' id_1 and id_3 should be returned.
for (TimelineEntity entity : result) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 30ead40..bc7b3a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -49,6 +49,11 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -60,11 +65,17 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
/**
* Various tests to test writing entities to HBase and reading them back from
* it.
@@ -79,18 +90,344 @@ import org.junit.Test;
public class TestHBaseTimelineStorage {
private static HBaseTestingUtility util;
+ private HBaseTimelineReaderImpl reader;
@BeforeClass
public static void setupBeforeClass() throws Exception {
util = new HBaseTestingUtility();
util.startMiniCluster();
createSchema();
+ loadEntities();
+ loadApps();
}
private static void createSchema() throws IOException {
TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
+ private static void loadApps() throws IOException {
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ String id = "application_1111111111_2222";
+ entity.setId(id);
+ entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ Long cTime = 1425016501000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+ // add the info map in Timeline Entity
+ Map<String, Object> infoMap = new HashMap<String, Object>();
+ infoMap.put("infoMapKey1", "infoMapValue1");
+ infoMap.put("infoMapKey2", 10);
+ entity.addInfo(infoMap);
+ // add the isRelatedToEntity info
+ String key = "task";
+ String value = "is_related_to_entity_id_here";
+ Set<String> isRelatedToSet = new HashSet<String>();
+ isRelatedToSet.add(value);
+ Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+ isRelatedTo.put(key, isRelatedToSet);
+ entity.setIsRelatedToEntities(isRelatedTo);
+ // add the relatesTo info
+ key = "container";
+ value = "relates_to_entity_id_here";
+ Set<String> relatesToSet = new HashSet<String>();
+ relatesToSet.add(value);
+ value = "relates_to_entity_id_here_Second";
+ relatesToSet.add(value);
+ Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+ relatesTo.put(key, relatesToSet);
+ entity.setRelatesToEntities(relatesTo);
+ // add some config entries
+ Map<String, String> conf = new HashMap<String, String>();
+ conf.put("config_param1", "value1");
+ conf.put("config_param2", "value2");
+ conf.put("cfg_param1", "value3");
+ entity.addConfigs(conf);
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId("MAP_SLOT_MILLIS");
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 120000, 100000000);
+ metricValues.put(ts - 100000, 200000000);
+ metricValues.put(ts - 80000, 300000000);
+ metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m12 = new TimelineMetric();
+ m12.setId("MAP1_BYTES");
+ m12.addValue(ts, 50);
+ metrics.add(m12);
+ entity.addMetrics(metrics);
+ TimelineEvent event = new TimelineEvent();
+ event.setId("event1");
+ event.setTimestamp(ts - 2000);
+ entity.addEvent(event);
+ te.addEntity(entity);
+
+ TimelineEntities te1 = new TimelineEntities();
+ TimelineEntity entity1 = new TimelineEntity();
+ String id1 = "application_1111111111_3333";
+ entity1.setId(id1);
+ entity1.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ entity1.setCreatedTime(cTime);
+ entity1.setModifiedTime(mTime);
+
+ // add the info map in Timeline Entity
+ Map<String, Object> infoMap1 = new HashMap<String, Object>();
+ infoMap1.put("infoMapKey1", "infoMapValue1");
+ infoMap1.put("infoMapKey2", 10);
+ entity1.addInfo(infoMap1);
+
+ // add the isRelatedToEntity info
+ String key1 = "task";
+ String value1 = "is_related_to_entity_id_here";
+ Set<String> isRelatedToSet1 = new HashSet<String>();
+ isRelatedToSet1.add(value1);
+ Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
+ isRelatedTo1.put(key, isRelatedToSet1);
+ entity1.setIsRelatedToEntities(isRelatedTo1);
+
+ // add the relatesTo info
+ key1 = "container";
+ value1 = "relates_to_entity_id_here";
+ Set<String> relatesToSet1 = new HashSet<String>();
+ relatesToSet1.add(value1);
+ value1 = "relates_to_entity_id_here_Second";
+ relatesToSet1.add(value1);
+ Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
+ relatesTo1.put(key1, relatesToSet1);
+ entity1.setRelatesToEntities(relatesTo1);
+
+ // add some config entries
+ Map<String, String> conf1 = new HashMap<String, String>();
+ conf1.put("cfg_param1", "value1");
+ conf1.put("cfg_param2", "value2");
+ entity1.addConfigs(conf1);
+
+ // add metrics
+ Set<TimelineMetric> metrics1 = new HashSet<>();
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId("MAP1_SLOT_MILLIS");
+ Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
+ long ts1 = System.currentTimeMillis();
+ metricValues1.put(ts1 - 120000, 100000000);
+ metricValues1.put(ts1 - 100000, 200000000);
+ metricValues1.put(ts1 - 80000, 300000000);
+ metricValues1.put(ts1 - 60000, 400000000);
+ metricValues1.put(ts1 - 40000, 50000000000L);
+ metricValues1.put(ts1 - 20000, 60000000000L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues1);
+ metrics1.add(m2);
+ entity1.addMetrics(metrics1);
+ te1.addEntity(entity1);
+
+ TimelineEntities te2 = new TimelineEntities();
+ TimelineEntity entity2 = new TimelineEntity();
+ String id2 = "application_1111111111_4444";
+ entity2.setId(id2);
+ entity2.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ entity2.setCreatedTime(cTime);
+ entity2.setModifiedTime(mTime);
+ te2.addEntity(entity2);
+ HBaseTimelineWriterImpl hbi = null;
+ try {
+ hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
+ hbi.init(util.getConfiguration());
+ hbi.start();
+ String cluster = "cluster1";
+ String user = "user1";
+ String flow = "some_flow_name";
+ String flowVersion = "AB7822C10F1111";
+ long runid = 1002345678919L;
+ String appName = "application_1111111111_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ appName = "application_1111111111_3333";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ appName = "application_1111111111_4444";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te2);
+ hbi.stop();
+ } finally {
+ if (hbi != null) {
+ hbi.stop();
+ hbi.close();
+ }
+ }
+ }
+
+ private static void loadEntities() throws IOException {
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ String id = "hello";
+ String type = "world";
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+ // add the info map in Timeline Entity
+ Map<String, Object> infoMap = new HashMap<String, Object>();
+ infoMap.put("infoMapKey1", "infoMapValue1");
+ infoMap.put("infoMapKey2", 10);
+ entity.addInfo(infoMap);
+ // add the isRelatedToEntity info
+ String key = "task";
+ String value = "is_related_to_entity_id_here";
+ Set<String> isRelatedToSet = new HashSet<String>();
+ isRelatedToSet.add(value);
+ Map<String, Set<String>> isRelatedTo = new HashMap<String, Set<String>>();
+ isRelatedTo.put(key, isRelatedToSet);
+ entity.setIsRelatedToEntities(isRelatedTo);
+
+ // add the relatesTo info
+ key = "container";
+ value = "relates_to_entity_id_here";
+ Set<String> relatesToSet = new HashSet<String>();
+ relatesToSet.add(value);
+ value = "relates_to_entity_id_here_Second";
+ relatesToSet.add(value);
+ Map<String, Set<String>> relatesTo = new HashMap<String, Set<String>>();
+ relatesTo.put(key, relatesToSet);
+ entity.setRelatesToEntities(relatesTo);
+
+ // add some config entries
+ Map<String, String> conf = new HashMap<String, String>();
+ conf.put("config_param1", "value1");
+ conf.put("config_param2", "value2");
+ conf.put("cfg_param1", "value3");
+ entity.addConfigs(conf);
+
+ // add metrics
+ Set<TimelineMetric> metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId("MAP_SLOT_MILLIS");
+ Map<Long, Number> metricValues = new HashMap<Long, Number>();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 120000, 100000000);
+ metricValues.put(ts - 100000, 200000000);
+ metricValues.put(ts - 80000, 300000000);
+ metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m12 = new TimelineMetric();
+ m12.setId("MAP1_BYTES");
+ m12.addValue(ts, 50);
+ metrics.add(m12);
+ entity.addMetrics(metrics);
+ te.addEntity(entity);
+
+ TimelineEntity entity1 = new TimelineEntity();
+ String id1 = "hello1";
+ entity1.setId(id1);
+ entity1.setType(type);
+ entity1.setCreatedTime(cTime);
+ entity1.setModifiedTime(mTime);
+
+ // add the info map in Timeline Entity
+ Map<String, Object> infoMap1 = new HashMap<String, Object>();
+ infoMap1.put("infoMapKey1", "infoMapValue1");
+ infoMap1.put("infoMapKey2", 10);
+ entity1.addInfo(infoMap1);
+
+ // add the isRelatedToEntity info
+ String key1 = "task";
+ String value1 = "is_related_to_entity_id_here";
+ Set<String> isRelatedToSet1 = new HashSet<String>();
+ isRelatedToSet1.add(value1);
+ Map<String, Set<String>> isRelatedTo1 = new HashMap<String, Set<String>>();
+ isRelatedTo1.put(key, isRelatedToSet1);
+ entity1.setIsRelatedToEntities(isRelatedTo1);
+
+ // add the relatesTo info
+ key1 = "container";
+ value1 = "relates_to_entity_id_here";
+ Set<String> relatesToSet1 = new HashSet<String>();
+ relatesToSet1.add(value1);
+ value1 = "relates_to_entity_id_here_Second";
+ relatesToSet1.add(value1);
+ Map<String, Set<String>> relatesTo1 = new HashMap<String, Set<String>>();
+ relatesTo1.put(key1, relatesToSet1);
+ entity1.setRelatesToEntities(relatesTo1);
+
+ // add some config entries
+ Map<String, String> conf1 = new HashMap<String, String>();
+ conf1.put("cfg_param1", "value1");
+ conf1.put("cfg_param2", "value2");
+ entity1.addConfigs(conf1);
+
+ // add metrics
+ Set<TimelineMetric> metrics1 = new HashSet<>();
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId("MAP1_SLOT_MILLIS");
+ Map<Long, Number> metricValues1 = new HashMap<Long, Number>();
+ long ts1 = System.currentTimeMillis();
+ metricValues1.put(ts1 - 120000, 100000000);
+ metricValues1.put(ts1 - 100000, 200000000);
+ metricValues1.put(ts1 - 80000, 300000000);
+ metricValues1.put(ts1 - 60000, 400000000);
+ metricValues1.put(ts1 - 40000, 50000000000L);
+ metricValues1.put(ts1 - 20000, 60000000000L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues1);
+ metrics1.add(m2);
+ entity1.addMetrics(metrics1);
+ te.addEntity(entity1);
+
+ TimelineEntity entity2 = new TimelineEntity();
+ String id2 = "hello2";
+ entity2.setId(id2);
+ entity2.setType(type);
+ entity2.setCreatedTime(cTime);
+ entity2.setModifiedTime(mTime);
+ te.addEntity(entity2);
+ HBaseTimelineWriterImpl hbi = null;
+ try {
+ hbi = new HBaseTimelineWriterImpl(util.getConfiguration());
+ hbi.init(util.getConfiguration());
+ hbi.start();
+ String cluster = "cluster1";
+ String user = "user1";
+ String flow = "some_flow_name";
+ String flowVersion = "AB7822C10F1111";
+ long runid = 1002345678919L;
+ String appName = "application_1231111111_1111";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.stop();
+ } finally {
+ if (hbi != null) {
+ hbi.stop();
+ hbi.close();
+ }
+ }
+ }
+
+ @Before
+ public void init() throws Exception {
+ reader = new HBaseTimelineReaderImpl();
+ reader.init(util.getConfiguration());
+ reader.start();
+ }
+
+ @After
+ public void stop() throws Exception {
+ if (reader != null) {
+ reader.stop();
+ reader.close();
+ }
+ }
+
private static void matchMetrics(Map<Long, Number> m1, Map<Long, Number> m2) {
assertEquals(m1.size(), m2.size());
for (Map.Entry<Long, Number> entry : m2.entrySet()) {
@@ -163,15 +500,11 @@ public class TestHBaseTimelineStorage {
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
- HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
String cluster = "cluster_test_write_app";
String user = "user1";
String flow = "some_flow_name";
@@ -256,8 +589,8 @@ public class TestHBaseTimelineStorage {
matchMetrics(metricValues, metricMap);
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
- entity.getType(), entity.getId(),
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appId,
+ entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
@@ -290,10 +623,6 @@ public class TestHBaseTimelineStorage {
hbi.stop();
hbi.close();
}
- if (hbr != null) {
- hbr.stop();
- hbr.close();
- }
}
}
@@ -362,15 +691,11 @@ public class TestHBaseTimelineStorage {
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
- HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
String cluster = "cluster_test_write_entity";
String user = "user1";
String flow = "some_flow_name";
@@ -468,12 +793,13 @@ public class TestHBaseTimelineStorage {
assertEquals(17, colCount);
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
- entity.getType(), entity.getId(),
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
+ entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
- Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+ Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
- null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ null, null, null, null, null, null,
+ EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
@@ -505,10 +831,6 @@ public class TestHBaseTimelineStorage {
hbi.stop();
hbi.close();
}
- if (hbr != null) {
- hbr.stop();
- hbr.close();
- }
}
}
@@ -559,15 +881,11 @@ public class TestHBaseTimelineStorage {
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
- HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
String cluster = "cluster_test_events";
String user = "user2";
String flow = "other_flow_name";
@@ -612,11 +930,11 @@ public class TestHBaseTimelineStorage {
}
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
- entity.getType(), entity.getId(),
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
+ entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
- TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
- entity.getType(), entity.getId(),
+ TimelineEntity e2 = reader.getEntity(user, cluster, null, null, appName,
+ entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertNotNull(e2);
@@ -641,10 +959,6 @@ public class TestHBaseTimelineStorage {
hbi.stop();
hbi.close();
}
- if (hbr != null) {
- hbr.stop();
- hbr.close();
- }
}
}
@@ -665,15 +979,11 @@ public class TestHBaseTimelineStorage {
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
- HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
hbi.start();
- hbr = new HBaseTimelineReaderImpl();
- hbr.init(c1);
- hbr.start();
String cluster = "cluster_test_empty_eventkey";
String user = "user_emptyeventkey";
String flow = "other_flow_name";
@@ -726,12 +1036,13 @@ public class TestHBaseTimelineStorage {
assertEquals(1, rowCount);
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
- entity.getType(), entity.getId(),
+ TimelineEntity e1 = reader.getEntity(user, cluster, flow, runid, appName,
+ entity.getType(), entity.getId(), null, null,
EnumSet.of(TimelineReader.Field.ALL));
- Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
+ Set<TimelineEntity> es1 = reader.getEntities(user, cluster, flow, runid,
appName, entity.getType(), null, null, null, null, null, null, null,
- null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ null, null, null, null, null, null,
+ EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
@@ -748,8 +1059,6 @@ public class TestHBaseTimelineStorage {
} finally {
hbi.stop();
hbi.close();
- hbr.stop();;
- hbr.close();
}
}
@@ -816,6 +1125,291 @@ public class TestHBaseTimelineStorage {
}
}
+ @Test
+ public void testReadEntities() throws Exception {
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+ 1002345678919L, "application_1231111111_1111","world", "hello", null,
+ null, EnumSet.of(Field.ALL));
+ assertNotNull(e1);
+ assertEquals(3, e1.getConfigs().size());
+ assertEquals(1, e1.getIsRelatedToEntities().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, null, null, null, null,
+ null, EnumSet.of(Field.ALL));
+ assertEquals(3, es1.size());
+ }
+
+ @Test
+ public void testReadEntitiesDefaultView() throws Exception {
+ TimelineEntity e1 =
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+ "application_1231111111_1111","world", "hello", null, null, null);
+ assertNotNull(e1);
+ assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
+ e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
+ e1.getRelatesToEntities().isEmpty());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, null, null, null, null,
+ null, null);
+ assertEquals(3, es1.size());
+ for (TimelineEntity e : es1) {
+ assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
+ e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
+ e.getRelatesToEntities().isEmpty());
+ }
+ }
+
+ @Test
+ public void testReadEntitiesByFields() throws Exception {
+ TimelineEntity e1 =
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+ "application_1231111111_1111","world", "hello", null, null,
+ EnumSet.of(Field.INFO, Field.CONFIGS));
+ assertNotNull(e1);
+ assertEquals(3, e1.getConfigs().size());
+ assertEquals(0, e1.getIsRelatedToEntities().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, null, null, null, null,
+ null, EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
+ assertEquals(3, es1.size());
+ int metricsCnt = 0;
+ int isRelatedToCnt = 0;
+ int infoCnt = 0;
+ for (TimelineEntity entity : es1) {
+ metricsCnt += entity.getMetrics().size();
+ isRelatedToCnt += entity.getIsRelatedToEntities().size();
+ infoCnt += entity.getInfo().size();
+ }
+ assertEquals(0, infoCnt);
+ assertEquals(2, isRelatedToCnt);
+ assertEquals(3, metricsCnt);
+ }
+
+ @Test
+ public void testReadEntitiesConfigPrefix() throws Exception {
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+ TimelineEntity e1 =
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+ "application_1231111111_1111","world", "hello", list, null, null);
+ assertNotNull(e1);
+ assertEquals(1, e1.getConfigs().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, null, null, null,
+ list, null, null);
+ int cfgCnt = 0;
+ for (TimelineEntity entity : es1) {
+ cfgCnt += entity.getConfigs().size();
+ }
+ assertEquals(3, cfgCnt);
+ }
+
+ @Test
+ public void testReadEntitiesConfigFilterPrefix() throws Exception {
+ Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, confFilters, null, null,
+ list, null, null);
+ assertEquals(1, entities.size());
+ int cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ }
+ assertEquals(2, cfgCnt);
+ }
+
+ @Test
+ public void testReadEntitiesMetricPrefix() throws Exception {
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+ TimelineEntity e1 =
+ reader.getEntity("user1", "cluster1", "some_flow_name", 1002345678919L,
+ "application_1231111111_1111","world", "hello", null, list, null);
+ assertNotNull(e1);
+ assertEquals(1, e1.getMetrics().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, null, null, null, null,
+ list, null);
+ int metricCnt = 0;
+ for (TimelineEntity entity : es1) {
+ metricCnt += entity.getMetrics().size();
+ }
+ assertEquals(2, metricCnt);
+ }
+
+ @Test
+ public void testReadEntitiesMetricFilterPrefix() throws Exception {
+ Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, "application_1231111111_1111","world",
+ null, null, null, null, null, null, null, null, null, metricFilters,
+ null, null, list, null);
+ assertEquals(1, entities.size());
+ int metricCnt = 0;
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ }
+ assertEquals(1, metricCnt);
+ }
+
+ @Test
+ public void testReadApps() throws Exception {
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+ 1002345678919L, "application_1111111111_2222",
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
+ EnumSet.of(Field.ALL));
+ assertNotNull(e1);
+ assertEquals(3, e1.getConfigs().size());
+ assertEquals(1, e1.getIsRelatedToEntities().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, null, null, null, null, null,
+ EnumSet.of(Field.ALL));
+ assertEquals(3, es1.size());
+ }
+
+ @Test
+ public void testReadAppsDefaultView() throws Exception {
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+ 1002345678919L, "application_1111111111_2222",
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null);
+ assertNotNull(e1);
+ assertTrue(e1.getInfo().isEmpty() && e1.getConfigs().isEmpty() &&
+ e1.getMetrics().isEmpty() && e1.getIsRelatedToEntities().isEmpty() &&
+ e1.getRelatesToEntities().isEmpty());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, null, null, null, null, null, null);
+ assertEquals(3, es1.size());
+ for (TimelineEntity e : es1) {
+ assertTrue(e.getInfo().isEmpty() && e.getConfigs().isEmpty() &&
+ e.getMetrics().isEmpty() && e.getIsRelatedToEntities().isEmpty() &&
+ e.getRelatesToEntities().isEmpty());
+ }
+ }
+
+ @Test
+ public void testReadAppsByFields() throws Exception {
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+ 1002345678919L, "application_1111111111_2222",
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null,
+ EnumSet.of(Field.INFO, Field.CONFIGS));
+ assertNotNull(e1);
+ assertEquals(3, e1.getConfigs().size());
+ assertEquals(0, e1.getIsRelatedToEntities().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, null, null, null, null, null,
+ EnumSet.of(Field.IS_RELATED_TO, Field.METRICS));
+ assertEquals(3, es1.size());
+ int metricsCnt = 0;
+ int isRelatedToCnt = 0;
+ int infoCnt = 0;
+ for (TimelineEntity entity : es1) {
+ metricsCnt += entity.getMetrics().size();
+ isRelatedToCnt += entity.getIsRelatedToEntities().size();
+ infoCnt += entity.getInfo().size();
+ }
+ assertEquals(0, infoCnt);
+ assertEquals(2, isRelatedToCnt);
+ assertEquals(3, metricsCnt);
+ }
+
+ @Test
+ public void testReadAppsConfigPrefix() throws Exception {
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+ 1002345678919L, "application_1111111111_2222",
+ TimelineEntityType.YARN_APPLICATION.toString(), null, list, null, null);
+ assertNotNull(e1);
+ assertEquals(1, e1.getConfigs().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, null, null, null, list, null, null);
+ int cfgCnt = 0;
+ for (TimelineEntity entity : es1) {
+ cfgCnt += entity.getConfigs().size();
+ }
+ assertEquals(3, cfgCnt);
+ }
+
+ @Test
+ public void testReadAppsConfigFilterPrefix() throws Exception {
+ Map<String, String> confFilters = ImmutableMap.of("cfg_param1","value1");
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "cfg_"));
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, confFilters, null, null, list, null, null);
+ assertEquals(1, entities.size());
+ int cfgCnt = 0;
+ for (TimelineEntity entity : entities) {
+ cfgCnt += entity.getConfigs().size();
+ }
+ assertEquals(2, cfgCnt);
+ }
+
+ @Test
+ public void testReadAppsMetricPrefix() throws Exception {
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+ TimelineEntity e1 = reader.getEntity("user1", "cluster1", "some_flow_name",
+ 1002345678919L, "application_1111111111_2222",
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, list, null);
+ assertNotNull(e1);
+ assertEquals(1, e1.getMetrics().size());
+ Set<TimelineEntity> es1 = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, null, null, null, null, list, null);
+ int metricCnt = 0;
+ for (TimelineEntity entity : es1) {
+ metricCnt += entity.getMetrics().size();
+ }
+ assertEquals(2, metricCnt);
+ }
+
+ @Test
+ public void testReadAppsMetricFilterPrefix() throws Exception {
+ TimelineFilterList list =
+ new TimelineFilterList(Operator.OR,
+ new TimelinePrefixFilter(TimelineCompareOp.EQUAL, "MAP1_"));
+ Set<String> metricFilters = ImmutableSet.of("MAP1_SLOT_MILLIS");
+ Set<TimelineEntity> entities = reader.getEntities("user1", "cluster1",
+ "some_flow_name", 1002345678919L, null,
+ TimelineEntityType.YARN_APPLICATION.toString(), null, null, null, null,
+ null, null, null, null, null, metricFilters, null, null, list, null);
+ int metricCnt = 0;
+ assertEquals(1, entities.size());
+ for (TimelineEntity entity : entities) {
+ metricCnt += entity.getMetrics().size();
+ }
+ assertEquals(1, metricCnt);
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index c957dad..434adac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -182,7 +182,7 @@ public class TestHBaseStorageFlowActivity {
Set<TimelineEntity> entities =
hbr.getEntities(null, cluster, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
- null, null, null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
@@ -238,7 +238,7 @@ public class TestHBaseStorageFlowActivity {
Set<TimelineEntity> entities =
hbr.getEntities(user, cluster, flow, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
- null, null, null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity entity = (FlowActivityEntity)e;
@@ -353,7 +353,7 @@ public class TestHBaseStorageFlowActivity {
Set<TimelineEntity> entities =
hbr.getEntities(null, cluster, null, null, null,
TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), 10L, null, null,
- null, null, null, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null, null, null, null);
assertEquals(1, entities.size());
for (TimelineEntity e : entities) {
FlowActivityEntity flowActivity = (FlowActivityEntity)e;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 4fb8f0e..5da0192 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
@@ -44,9 +45,13 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -178,9 +183,8 @@ public class TestHBaseStorageFlowRun {
hbr.init(c1);
hbr.start();
// get the flow run entity
- TimelineEntity entity =
- hbr.getEntity(user, cluster, flow, runid, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+ TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
+ TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
FlowRunEntity flowRun = (FlowRunEntity)entity;
assertEquals(minStartTs, flowRun.getStartTime());
@@ -238,9 +242,8 @@ public class TestHBaseStorageFlowRun {
hbr = new HBaseTimelineReaderImpl();
hbr.init(c1);
hbr.start();
- TimelineEntity entity =
- hbr.getEntity(user, cluster, flow, runid, null,
- TimelineEntityType.YARN_FLOW_RUN.toString(), null, null);
+ TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
+ TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null, null);
assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
Set<TimelineMetric> metrics = entity.getMetrics();
assertEquals(2, metrics.size());
@@ -305,6 +308,181 @@ public class TestHBaseStorageFlowRun {
assertEquals(1, rowCount);
}
+ @Test
+ public void testWriteFlowRunMetricsPrefix() throws Exception {
+ String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+ String user = "testWriteFlowRunMetricsOneFlow_user1";
+ String flow = "testing_flowRun_metrics_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ long runid = 1002345678919L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_11111111111111_1111";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ // write another application with same metric to this flow
+ te = new TimelineEntities();
+ TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ te.addEntity(entityApp2);
+ appName = "application_11111111111111_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ // check flow run
+ checkFlowRunTable(cluster, user, flow, runid, c1);
+
+ // use the timeline reader to verify data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+ TimelineFilterList metricsToRetrieve =
+ new TimelineFilterList(new TimelinePrefixFilter(TimelineCompareOp.EQUAL,
+ metric1.substring(0, metric1.indexOf("_") + 1)));
+ TimelineEntity entity = hbr.getEntity(user, cluster, flow, runid, null,
+ TimelineEntityType.YARN_FLOW_RUN.toString(), null, null,
+ metricsToRetrieve, null);
+ assertTrue(TimelineEntityType.YARN_FLOW_RUN.matches(entity.getType()));
+ Set<TimelineMetric> metrics = entity.getMetrics();
+ assertEquals(1, metrics.size());
+ for (TimelineMetric metric : metrics) {
+ String id = metric.getId();
+ Map<Long, Number> values = metric.getValues();
+ assertEquals(1, values.size());
+ Number value = null;
+ for (Number n : values.values()) {
+ value = n;
+ }
+ switch (id) {
+ case metric1:
+ assertEquals(141L, value);
+ break;
+ default:
+ fail("unrecognized metric: " + id);
+ }
+ }
+
+ Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid,
+ null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
+ null, null, null, null, null, null, null, null, null,
+ metricsToRetrieve, null);
+ assertEquals(1, entities.size());
+ for (TimelineEntity timelineEntity : entities) {
+ Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
+ assertEquals(1, timelineMetrics.size());
+ for (TimelineMetric metric : timelineMetrics) {
+ String id = metric.getId();
+ Map<Long, Number> values = metric.getValues();
+ assertEquals(1, values.size());
+ Number value = null;
+ for (Number n : values.values()) {
+ value = n;
+ }
+ switch (id) {
+ case metric1:
+ assertEquals(141L, value);
+ break;
+ default:
+ fail("unrecognized metric: " + id);
+ }
+ }
+ }
+ } finally {
+ hbr.close();
+ }
+ }
+
+ @Test
+ public void testWriteFlowRunsMetricFields() throws Exception {
+ String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+ String user = "testWriteFlowRunMetricsOneFlow_user1";
+ String flow = "testing_flowRun_metrics_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ long runid = 1002345678919L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_11111111111111_1111";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ // write another application with same metric to this flow
+ te = new TimelineEntities();
+ TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ te.addEntity(entityApp2);
+ appName = "application_11111111111111_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ // check flow run
+ checkFlowRunTable(cluster, user, flow, runid, c1);
+
+ // use the timeline reader to verify data
+ HBaseTimelineReaderImpl hbr = null;
+ try {
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
+ Set<TimelineEntity> entities = hbr.getEntities(user, cluster, flow, runid,
+ null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
+ null, null, null, null, null, null, null, null, null, null, null);
+ assertEquals(1, entities.size());
+ for (TimelineEntity timelineEntity : entities) {
+ assertEquals(0, timelineEntity.getMetrics().size());
+ }
+
+ entities = hbr.getEntities(user, cluster, flow, runid,
+ null, TimelineEntityType.YARN_FLOW_RUN.toString(), null, null, null,
+ null, null, null, null, null, null, null, null, null,
+ null, EnumSet.of(Field.METRICS));
+ assertEquals(1, entities.size());
+ for (TimelineEntity timelineEntity : entities) {
+ Set<TimelineMetric> timelineMetrics = timelineEntity.getMetrics();
+ assertEquals(2, timelineMetrics.size());
+ for (TimelineMetric metric : timelineMetrics) {
+ String id = metric.getId();
+ Map<Long, Number> values = metric.getValues();
+ assertEquals(1, values.size());
+ Number value = null;
+ for (Number n : values.values()) {
+ value = n;
+ }
+ switch (id) {
+ case metric1:
+ assertEquals(141L, value);
+ break;
+ case metric2:
+ assertEquals(57L, value);
+ break;
+ default:
+ fail("unrecognized metric: " + id);
+ }
+ }
+ }
+ } finally {
+ hbr.close();
+ }
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
[2/2] hadoop git commit: YARN-3862. Support for fetching specific
configs and metrics based on prefixes (Varun Saxena via sjlee)
Posted by sj...@apache.org.
YARN-3862. Support for fetching specific configs and metrics based on prefixes (Varun Saxena via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cdb96df9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cdb96df9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cdb96df9
Branch: refs/heads/feature-YARN-2928
Commit: cdb96df97377ce8ee99c696e4fadfd889effa4e2
Parents: 7c86163
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue Dec 1 21:47:43 2015 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Dec 1 21:47:43 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../reader/TimelineReaderManager.java | 4 +-
.../reader/filter/TimelineCompareFilter.java | 61 ++
.../reader/filter/TimelineCompareOp.java | 36 +
.../reader/filter/TimelineFilter.java | 56 ++
.../reader/filter/TimelineFilterList.java | 91 +++
.../reader/filter/TimelineFilterUtils.java | 120 ++++
.../reader/filter/TimelinePrefixFilter.java | 56 ++
.../reader/filter/package-info.java | 28 +
.../storage/ApplicationEntityReader.java | 123 +++-
.../storage/FileSystemTimelineReaderImpl.java | 9 +-
.../storage/FlowActivityEntityReader.java | 16 +-
.../storage/FlowRunEntityReader.java | 69 +-
.../storage/GenericEntityReader.java | 119 +++-
.../storage/HBaseTimelineReaderImpl.java | 11 +-
.../storage/TimelineEntityReader.java | 32 +-
.../storage/TimelineEntityReaderFactory.java | 23 +-
.../timelineservice/storage/TimelineReader.java | 32 +
.../application/ApplicationColumnPrefix.java | 18 +-
.../storage/common/ColumnPrefix.java | 29 +-
.../storage/entity/EntityColumnPrefix.java | 18 +-
.../storage/flow/FlowActivityColumnPrefix.java | 18 +-
.../storage/flow/FlowRunColumnPrefix.java | 18 +-
.../TestFileSystemTimelineReaderImpl.java | 42 +-
.../storage/TestHBaseTimelineStorage.java | 682 +++++++++++++++++--
.../flow/TestHBaseStorageFlowActivity.java | 6 +-
.../storage/flow/TestHBaseStorageFlowRun.java | 190 +++++-
27 files changed, 1761 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9192626..8277c30 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4053. Change the way metric values are stored in HBase Storage (Varun
Saxena via sjlee)
+ YARN-3862. Support for fetching specific configs and metrics based on
+ prefixes (Varun Saxena via sjlee)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
index 27a50d5..294b05b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java
@@ -77,7 +77,7 @@ public class TimelineReaderManager extends AbstractService {
return reader.getEntities(userId, cluster, flowId, flowRunId, appId,
entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
- metricFilters, eventFilters, fieldsToRetrieve);
+ metricFilters, eventFilters, null, null, fieldsToRetrieve);
}
/**
@@ -91,6 +91,6 @@ public class TimelineReaderManager extends AbstractService {
String entityId, EnumSet<Field> fields) throws IOException {
String cluster = getClusterID(clusterId, getConfig());
return reader.getEntity(userId, cluster, flowId, flowRunId, appId,
- entityType, entityId, fields);
+ entityType, entityId, null, null, fields);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
new file mode 100644
index 0000000..14e7124
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on key-value pair
+ * and the relation between them represented by different relational operators.
+ */
+@Private
+@Unstable
+public class TimelineCompareFilter extends TimelineFilter {
+
+ private TimelineCompareOp compareOp;
+ private String key;
+ private Object value;
+
+ public TimelineCompareFilter() {
+ }
+
+ public TimelineCompareFilter(TimelineCompareOp op, String key, Object val) {
+ this.compareOp = op;
+ this.key = key;
+ this.value = val;
+ }
+
+ @Override
+ public TimelineFilterType getFilterType() {
+ return TimelineFilterType.COMPARE;
+ }
+
+ public TimelineCompareOp getCompareOp() {
+ return compareOp;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
new file mode 100644
index 0000000..461a7d8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineCompareOp.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Comparison Operators.
+ */
+@Private
+@Unstable
+public enum TimelineCompareOp {
+ LESS_THAN,
+ LESS_OR_EQUAL,
+ EQUAL,
+ NOT_EQUAL,
+ GREATER_OR_EQUAL,
+ GREATER_THAN
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
new file mode 100644
index 0000000..d4b4045
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Abstract base class extended to implement timeline filters.
+ */
+@Private
+@Unstable
+public abstract class TimelineFilter {
+
+ /**
+ * Lists the different filter types.
+ */
+ @Private
+ @Unstable
+ public enum TimelineFilterType {
+ /**
+ * Combines multiple filters.
+ */
+ LIST,
+ /**
+ * Filter which is used for comparison.
+ */
+ COMPARE,
+ /**
+ * Filter which matches prefix for a config or a metric.
+ */
+ PREFIX
+ }
+
+ public abstract TimelineFilterType getFilterType();
+
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
new file mode 100644
index 0000000..8727bd7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterList.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Implementation of {@link TimelineFilter} that represents an ordered list of
+ * timeline filters which will then be evaluated with a specified boolean
+ * operator {@link Operator#AND} or {@link Operator#OR}. Since you can use
+ * timeline filter lists as children of timeline filter lists, you can create a
+ * hierarchy of filters to be evaluated.
+ */
+@Private
+@Unstable
+public class TimelineFilterList extends TimelineFilter {
+ /**
+ * Specifies how filters in the filter list will be evaluated. AND means all
+ * the filters should match and OR means atleast one should match.
+ */
+ @Private
+ @Unstable
+ public static enum Operator {
+ AND,
+ OR
+ }
+
+ private Operator operator;
+ private List<TimelineFilter> filterList = new ArrayList<TimelineFilter>();
+
+ public TimelineFilterList(TimelineFilter...filters) {
+ this(Operator.AND, filters);
+ }
+
+ public TimelineFilterList(Operator op, TimelineFilter...filters) {
+ this.operator = op;
+ this.filterList = new ArrayList<TimelineFilter>(Arrays.asList(filters));
+ }
+
+ @Override
+ public TimelineFilterType getFilterType() {
+ return TimelineFilterType.LIST;
+ }
+
+ /**
+ * Get the filter list.
+ *
+ * @return filterList
+ */
+ public List<TimelineFilter> getFilterList() {
+ return filterList;
+ }
+
+ /**
+ * Get the operator.
+ *
+ * @return operator
+ */
+ public Operator getOperator() {
+ return operator;
+ }
+
+ public void setOperator(Operator op) {
+ operator = op;
+ }
+
+ public void addFilter(TimelineFilter filter) {
+ filterList.add(filter);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
new file mode 100644
index 0000000..da3c383
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+
+/**
+ * Set of utility methods used by timeline filter classes.
+ */
+public final class TimelineFilterUtils {
+
+ private TimelineFilterUtils() {
+ }
+
+ /**
+ * Returns the equivalent HBase filter list's {@link Operator}.
+ * @param op
+ * @return HBase filter list's Operator.
+ */
+ private static Operator getHBaseOperator(TimelineFilterList.Operator op) {
+ switch (op) {
+ case AND:
+ return Operator.MUST_PASS_ALL;
+ case OR:
+ return Operator.MUST_PASS_ONE;
+ default:
+ throw new IllegalArgumentException("Invalid operator");
+ }
+ }
+
+ /**
+ * Returns the equivalent HBase compare filter's {@link CompareOp}.
+ * @param op
+ * @return HBase compare filter's CompareOp.
+ */
+ private static CompareOp getHBaseCompareOp(
+ TimelineCompareOp op) {
+ switch (op) {
+ case LESS_THAN:
+ return CompareOp.LESS;
+ case LESS_OR_EQUAL:
+ return CompareOp.LESS_OR_EQUAL;
+ case EQUAL:
+ return CompareOp.EQUAL;
+ case NOT_EQUAL:
+ return CompareOp.NOT_EQUAL;
+ case GREATER_OR_EQUAL:
+ return CompareOp.GREATER_OR_EQUAL;
+ case GREATER_THAN:
+ return CompareOp.GREATER;
+ default:
+ throw new IllegalArgumentException("Invalid compare operator");
+ }
+ }
+
+ /**
+ * Converts a {@link TimelinePrefixFilter} to an equivalent HBase
+ * {@link QualifierFilter}.
+ * @param colPrefix
+ * @param filter
+ * @return a {@link QualifierFilter} object
+ */
+ private static <T> Filter createHBaseColQualPrefixFilter(
+ ColumnPrefix<T> colPrefix, TimelinePrefixFilter filter) {
+ return new QualifierFilter(getHBaseCompareOp(filter.getCompareOp()),
+ new BinaryPrefixComparator(
+ colPrefix.getColumnPrefixBytes(filter.getPrefix())));
+ }
+
+ /**
+ * Creates equivalent HBase {@link FilterList} from {@link TimelineFilterList}
+ * while converting different timeline filters(of type {@link TimelineFilter})
+ * into their equivalent HBase filters.
+ * @param colPrefix
+ * @param filterList
+ * @return a {@link FilterList} object
+ */
+ public static <T> FilterList createHBaseFilterList(ColumnPrefix<T> colPrefix,
+ TimelineFilterList filterList) {
+ FilterList list =
+ new FilterList(getHBaseOperator(filterList.getOperator()));
+ for (TimelineFilter filter : filterList.getFilterList()) {
+ switch(filter.getFilterType()) {
+ case LIST:
+ list.addFilter(
+ createHBaseFilterList(colPrefix, (TimelineFilterList)filter));
+ break;
+ case PREFIX:
+ list.addFilter(createHBaseColQualPrefixFilter(
+ colPrefix, (TimelinePrefixFilter)filter));
+ break;
+ default:
+ break;
+ }
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
new file mode 100644
index 0000000..6233f26
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelinePrefixFilter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Filter class which represents filter to be applied based on prefixes.
+ * Prefixes can either match or not match.
+ */
+@Private
+@Unstable
+public class TimelinePrefixFilter extends TimelineFilter {
+
+ private TimelineCompareOp compareOp;
+ private String prefix;
+
+ public TimelinePrefixFilter(TimelineCompareOp op, String prefix) {
+ this.prefix = prefix;
+ if (op != TimelineCompareOp.EQUAL && op != TimelineCompareOp.NOT_EQUAL) {
+ throw new IllegalArgumentException("CompareOp for prefix filter should " +
+ "be EQUAL or NOT_EQUAL");
+ }
+ this.compareOp = op;
+ }
+
+ @Override
+ public TimelineFilterType getFilterType() {
+ return TimelineFilterType.PREFIX;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public TimelineCompareOp getCompareOp() {
+ return compareOp;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
new file mode 100644
index 0000000..f7c0705
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.server.timelineservice.reader.filter stores
+ * timeline filter implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.reader.filter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index 8324afd..7082a5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -28,11 +28,21 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
@@ -56,18 +66,21 @@ class ApplicationEntityReader extends GenericEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, true);
+ eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
+ true);
}
public ApplicationEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
/**
@@ -78,13 +91,95 @@ class ApplicationEntityReader extends GenericEntityReader {
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected FilterList constructFilterListBasedOnFields() {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+ // Fetch all the columns.
+ if (fieldsToRetrieve.contains(Field.ALL) &&
+ (confsToRetrieve == null ||
+ confsToRetrieve.getFilterList().isEmpty()) &&
+ (metricsToRetrieve == null ||
+ metricsToRetrieve.getFilterList().isEmpty())) {
+ return list;
+ }
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ // Events not required.
+ if (!fieldsToRetrieve.contains(Field.EVENTS) &&
+ !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+ }
+ // info not required.
+ if (!fieldsToRetrieve.contains(Field.INFO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
+ }
+ // is releated to not required.
+ if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+ }
+ // relates to not required.
+ if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+ }
+ list.addFilter(infoColFamilyList);
+ if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
+ (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterCfg =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
+ if (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty()) {
+ filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.CONFIG, confsToRetrieve));
+ }
+ list.addFilter(filterCfg);
+ }
+ if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
+ (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterMetrics =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ ApplicationColumnPrefix.METRIC, metricsToRetrieve));
+ }
+ list.addFilter(filterMetrics);
+ }
+ return list;
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
byte[] rowKey =
ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId,
appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ get.setFilter(filterList);
+ }
return table.getResult(hbaseConf, conn, get);
}
@@ -115,6 +210,15 @@ class ApplicationEntityReader extends GenericEntityReader {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
+ if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
+ confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.CONFIGS);
+ }
+ if (!fieldsToRetrieve.contains(Field.METRICS) &&
+ metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.METRICS);
+ }
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
@@ -136,7 +240,7 @@ class ApplicationEntityReader extends GenericEntityReader {
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
if (flowRunId != null) {
scan.setRowPrefixFilter(ApplicationRowKey.
@@ -145,7 +249,12 @@ class ApplicationEntityReader extends GenericEntityReader {
scan.setRowPrefixFilter(ApplicationRowKey.
getRowKeyPrefix(clusterId, userId, flowId));
}
- scan.setFilter(new PageFilter(limit));
+ FilterList newList = new FilterList();
+ newList.addFilter(new PageFilter(limit));
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ newList.addFilter(filterList);
+ }
+ scan.setFilter(newList);
return table.getResultScanner(hbaseConf, conn, scan);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index 30d1d00..48bf844 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
@@ -272,6 +273,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
if (limit == null || limit <= 0) {
limit = DEFAULT_LIMIT;
@@ -386,7 +388,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
@Override
public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) throws IOException {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
+ throws IOException {
String flowRunPath = getFlowRunPath(userId, clusterId, flowId,
flowRunId, appId);
File dir = new File(new File(rootPath, ENTITIES_DIR),
@@ -413,6 +417,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
String flowRunPath =
getFlowRunPath(userId, clusterId, flowId, flowRunId, appId);
@@ -422,6 +427,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
return getEntities(dir, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve);
+ eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
index 3e32128..71dd0a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@@ -58,14 +59,14 @@ class FlowActivityEntityReader extends TimelineEntityReader {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, true);
+ eventFilters, null, null, fieldsToRetrieve, true);
}
public FlowActivityEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
String entityId, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ null, null, fieldsToRetrieve);
}
/**
@@ -96,15 +97,20 @@ class FlowActivityEntityReader extends TimelineEntityReader {
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected FilterList constructFilterListBasedOnFields() {
+ return null;
+ }
+
+ @Override
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
throw new UnsupportedOperationException(
"we don't support a single entity query");
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
createdTimeEnd == DEFAULT_END_TIME) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
index ebf2d27..1895fa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
@@ -28,12 +28,22 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
@@ -54,18 +64,20 @@ class FlowRunEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, true);
+ eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
}
public FlowRunEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ null, metricsToRetrieve, fieldsToRetrieve);
}
/**
@@ -101,26 +113,69 @@ class FlowRunEntityReader extends TimelineEntityReader {
if (createdTimeEnd == null) {
createdTimeEnd = DEFAULT_END_TIME;
}
+ if (!fieldsToRetrieve.contains(Field.METRICS) &&
+ metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.METRICS);
+ }
+ }
+ }
+
+ @Override
+ protected FilterList constructFilterListBasedOnFields() {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
+ // Metrics not required.
+ if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
+ !fieldsToRetrieve.contains(Field.ALL)) {
+ FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
+ infoColFamilyList.addFilter(infoColumnFamily);
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
+ list.addFilter(infoColFamilyList);
+ }
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ FilterList infoColFamilyList = new FilterList();
+ infoColFamilyList.addFilter(infoColumnFamily);
+ infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ FlowRunColumnPrefix.METRIC, metricsToRetrieve));
+ list.addFilter(infoColFamilyList);
}
+ return list;
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
byte[] rowKey =
FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ get.setFilter(filterList);
+ }
return table.getResult(hbaseConf, conn, get);
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
Scan scan = new Scan();
scan.setRowPrefixFilter(
FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowId));
- scan.setFilter(new PageFilter(limit));
+ FilterList newList = new FilterList();
+ newList.addFilter(new PageFilter(limit));
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ newList.addFilter(filterList);
+ }
+ scan.setFilter(newList);
return table.getResultScanner(hbaseConf, conn, scan);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index 04fc8ee..dcb8b89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -32,9 +32,18 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
@@ -46,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -72,18 +82,21 @@ class GenericEntityReader extends TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, fieldsToRetrieve, sortedKeys);
+ eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
+ sortedKeys);
}
public GenericEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId,
- fieldsToRetrieve);
+ confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
}
/**
@@ -93,6 +106,85 @@ class GenericEntityReader extends TimelineEntityReader {
return ENTITY_TABLE;
}
+ @Override
+ protected FilterList constructFilterListBasedOnFields() {
+ FilterList list = new FilterList(Operator.MUST_PASS_ONE);
+ // Fetch all the columns.
+ if (fieldsToRetrieve.contains(Field.ALL) &&
+ (confsToRetrieve == null ||
+ confsToRetrieve.getFilterList().isEmpty()) &&
+ (metricsToRetrieve == null ||
+ metricsToRetrieve.getFilterList().isEmpty())) {
+ return list;
+ }
+ FilterList infoColFamilyList = new FilterList();
+ // By default fetch everything in INFO column family.
+ FamilyFilter infoColumnFamily =
+ new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
+ infoColFamilyList.addFilter(infoColumnFamily);
+ // Events not required.
+ if (!fieldsToRetrieve.contains(Field.EVENTS) &&
+ !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
+ }
+ // info not required.
+ if (!fieldsToRetrieve.contains(Field.INFO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
+ }
+ // is related to not required.
+ if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
+ }
+ // relates to not required.
+ if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
+ !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
+ infoColFamilyList.addFilter(
+ new QualifierFilter(CompareOp.NOT_EQUAL,
+ new BinaryPrefixComparator(
+ EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
+ }
+ list.addFilter(infoColFamilyList);
+ if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
+ (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterCfg =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
+ if (confsToRetrieve != null &&
+ !confsToRetrieve.getFilterList().isEmpty()) {
+ filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.CONFIG, confsToRetrieve));
+ }
+ list.addFilter(filterCfg);
+ }
+ if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
+ (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty())) {
+ FilterList filterMetrics =
+ new FilterList(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
+ if (metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
+ EntityColumnPrefix.METRIC, metricsToRetrieve));
+ }
+ list.addFilter(filterMetrics);
+ }
+ return list;
+ }
+
protected FlowContext lookupFlowContext(String clusterId, String appId,
Configuration hbaseConf, Connection conn) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
@@ -145,6 +237,15 @@ class GenericEntityReader extends TimelineEntityReader {
if (fieldsToRetrieve == null) {
fieldsToRetrieve = EnumSet.noneOf(Field.class);
}
+ if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
+ confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.CONFIGS);
+ }
+ if (!fieldsToRetrieve.contains(Field.METRICS) &&
+ metricsToRetrieve != null &&
+ !metricsToRetrieve.getFilterList().isEmpty()) {
+ fieldsToRetrieve.add(Field.METRICS);
+ }
if (!singleEntityRead) {
if (limit == null || limit < 0) {
limit = TimelineReader.DEFAULT_LIMIT;
@@ -165,25 +266,31 @@ class GenericEntityReader extends TimelineEntityReader {
}
@Override
- protected Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException {
+ protected Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException {
byte[] rowKey =
EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId,
entityType, entityId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ get.setFilter(filterList);
+ }
return table.getResult(hbaseConf, conn, get);
}
@Override
protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException {
+ Connection conn, FilterList filterList) throws IOException {
// Scan through part of the table to find the entities belong to one app
// and one type
Scan scan = new Scan();
scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
clusterId, userId, flowId, flowRunId, appId, entityType));
scan.setMaxVersions(Integer.MAX_VALUE);
+ if (filterList != null && !filterList.getFilters().isEmpty()) {
+ scan.setFilter(filterList);
+ }
return table.getResultScanner(hbaseConf, conn, scan);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 889ae19..9e4b26a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader {
@@ -64,11 +65,13 @@ public class HBaseTimelineReaderImpl
@Override
public TimelineEntity getEntity(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve)
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve)
throws IOException {
TimelineEntityReader reader =
TimelineEntityReaderFactory.createSingleEntityReader(userId, clusterId,
- flowId, flowRunId, appId, entityType, entityId, fieldsToRetrieve);
+ flowId, flowRunId, appId, entityType, entityId, confsToRetrieve,
+ metricsToRetrieve, fieldsToRetrieve);
return reader.readEntity(hbaseConf, conn);
}
@@ -80,13 +83,15 @@ public class HBaseTimelineReaderImpl
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException {
TimelineEntityReader reader =
TimelineEntityReaderFactory.createMultipleEntitiesReader(userId,
clusterId, flowId, flowRunId, appId, entityType, limit,
createdTimeBegin, createdTimeEnd, modifiedTimeBegin,
modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters,
- metricFilters, eventFilters, fieldsToRetrieve);
+ metricFilters, eventFilters, confsToRetrieve, metricsToRetrieve,
+ fieldsToRetrieve);
return reader.readEntities(hbaseConf, conn);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
index adaf42e..7178aab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
@@ -31,8 +31,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
@@ -70,6 +72,8 @@ abstract class TimelineEntityReader {
protected Map<String, String> configFilters;
protected Set<String> metricFilters;
protected Set<String> eventFilters;
+ protected TimelineFilterList confsToRetrieve;
+ protected TimelineFilterList metricsToRetrieve;
/**
* Main table the entity reader uses.
@@ -94,6 +98,7 @@ abstract class TimelineEntityReader {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
this.singleEntityRead = false;
this.sortedKeys = sortedKeys;
@@ -115,6 +120,8 @@ abstract class TimelineEntityReader {
this.configFilters = configFilters;
this.metricFilters = metricFilters;
this.eventFilters = eventFilters;
+ this.confsToRetrieve = confsToRetrieve;
+ this.metricsToRetrieve = metricsToRetrieve;
this.table = getTable();
}
@@ -124,7 +131,8 @@ abstract class TimelineEntityReader {
*/
protected TimelineEntityReader(String userId, String clusterId,
String flowId, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityId, TimelineFilterList confsToRetrieve,
+ TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
this.singleEntityRead = true;
this.userId = userId;
this.clusterId = clusterId;
@@ -134,11 +142,21 @@ abstract class TimelineEntityReader {
this.entityType = entityType;
this.fieldsToRetrieve = fieldsToRetrieve;
this.entityId = entityId;
+ this.confsToRetrieve = confsToRetrieve;
+ this.metricsToRetrieve = metricsToRetrieve;
this.table = getTable();
}
/**
+ * Creates a {@link FilterList} based on fields, confs and metrics to
+ * retrieve. This filter list will be set in Scan/Get objects to trim down
+ * results fetched from HBase back-end storage.
+ * @return a {@link FilterList} object.
+ */
+ protected abstract FilterList constructFilterListBasedOnFields();
+
+ /**
* Reads and deserializes a single timeline entity from the HBase storage.
*/
public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
@@ -146,7 +164,8 @@ abstract class TimelineEntityReader {
validateParams();
augmentParams(hbaseConf, conn);
- Result result = getResult(hbaseConf, conn);
+ FilterList filterList = constructFilterListBasedOnFields();
+ Result result = getResult(hbaseConf, conn, filterList);
if (result == null || result.isEmpty()) {
// Could not find a matching row.
LOG.info("Cannot find matching entity of type " + entityType);
@@ -166,7 +185,8 @@ abstract class TimelineEntityReader {
augmentParams(hbaseConf, conn);
NavigableSet<TimelineEntity> entities = new TreeSet<>();
- ResultScanner results = getResults(hbaseConf, conn);
+ FilterList filterList = constructFilterListBasedOnFields();
+ ResultScanner results = getResults(hbaseConf, conn, filterList);
try {
for (Result result : results) {
TimelineEntity entity = parseEntity(result);
@@ -211,14 +231,14 @@ abstract class TimelineEntityReader {
*
* @return the {@link Result} instance or null if no such record is found.
*/
- protected abstract Result getResult(Configuration hbaseConf, Connection conn)
- throws IOException;
+ protected abstract Result getResult(Configuration hbaseConf, Connection conn,
+ FilterList filterList) throws IOException;
/**
* Fetches a {@link ResultScanner} for a multi-entity read.
*/
protected abstract ResultScanner getResults(Configuration hbaseConf,
- Connection conn) throws IOException;
+ Connection conn, FilterList filterList) throws IOException;
/**
* Given a {@link Result} instance, deserializes and creates a
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
index f5341c2..16204c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
/**
@@ -34,22 +35,23 @@ class TimelineEntityReaderFactory {
*/
public static TimelineEntityReader createSingleEntityReader(String userId,
String clusterId, String flowId, Long flowRunId, String appId,
- String entityType, String entityId, EnumSet<Field> fieldsToRetrieve) {
+ String entityType, String entityId, TimelineFilterList confs,
+ TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
+ appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
+ appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, entityId, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
+ appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
}
}
@@ -64,6 +66,7 @@ class TimelineEntityReaderFactory {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confs, TimelineFilterList metrics,
EnumSet<Field> fieldsToRetrieve) {
// currently the types that are handled separate from the generic entity
// table are application, flow run, and flow activity entities
@@ -71,8 +74,8 @@ class TimelineEntityReaderFactory {
return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve);
+ infoFilters, configFilters, metricFilters, eventFilters, confs,
+ metrics, fieldsToRetrieve);
} else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
@@ -83,15 +86,15 @@ class TimelineEntityReaderFactory {
return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve);
+ infoFilters, configFilters, metricFilters, eventFilters, confs,
+ metrics, fieldsToRetrieve);
} else {
// assume we're dealing with a generic entity read
return new GenericEntityReader(userId, clusterId, flowId, flowRunId,
appId, entityType, limit, createdTimeBegin, createdTimeEnd,
modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve, false);
+ infoFilters, configFilters, metricFilters, eventFilters, confs,
+ metrics, fieldsToRetrieve, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
index e4e305e..0ed17da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineReader.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
/** ATSv2 reader interface. */
@Private
@@ -70,6 +72,18 @@ public interface TimelineReader extends Service {
* Entity type (mandatory)
* @param entityId
* Entity Id (mandatory)
+ * @param confsToRetrieve
+ * Used for deciding which configs to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact config
+ * keys' or prefixes which are then compared against config keys' to decide
+ * configs to return in response.
+ * @param metricsToRetrieve
+ * Used for deciding which metrics to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact metric
+ * ids' or prefixes which are then compared against metric ids' to decide
+ * metrics to return in response.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve(optional), see
* {@link Field}. If null, retrieves 4 fields namely entity id,
@@ -81,6 +95,7 @@ public interface TimelineReader extends Service {
*/
TimelineEntity getEntity(String userId, String clusterId, String flowId,
Long flowRunId, String appId, String entityType, String entityId,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException;
/**
@@ -139,6 +154,22 @@ public interface TimelineReader extends Service {
* @param eventFilters
* Matched entities should contain the given events (optional). If null
* or empty, the filter is not applied.
+ * @param confsToRetrieve
+ * Used for deciding which configs to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact config
+ * keys' or prefixes which are then compared against config keys' to decide
+ * configs(inside entities) to return in response. This should not be
+ * confused with configFilters which is used to decide which entities to
+ * return instead.
+ * @param metricsToRetrieve
+ * Used for deciding which metrics to return in response. This is
+ * represented as a {@link TimelineFilterList} object containing
+ * {@link TimelinePrefixFilter} objects. These can either be exact metric
+ * ids' or prefixes which are then compared against metric ids' to decide
+ * metrics(inside entities) to return in response. This should not be
+ * confused with metricFilters which is used to decide which entities to
+ * return instead.
* @param fieldsToRetrieve
* Specifies which fields of the entity object to retrieve(optional), see
* {@link Field}. If null, retrieves 4 fields namely entity id,
@@ -158,5 +189,6 @@ public interface TimelineReader extends Service {
Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
Map<String, Object> infoFilters, Map<String, String> configFilters,
Set<String> metricFilters, Set<String> eventFilters,
+ TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
EnumSet<Field> fieldsToRetrieve) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index b06f5c1..056e51f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -119,6 +119,18 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
return columnPrefix;
}
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
/*
* (non-Javadoc)
*
@@ -139,8 +151,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
@@ -166,8 +177,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index db49098..0f3ac4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -44,13 +44,13 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
- *@param attributes attributes for the mutation that are used by the coprocessor
- * to set/read the cell tags
+ * @param attributes attributes for the mutation that are used by the
+ * coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
- public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
byte[] qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
@@ -65,13 +65,13 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
- *@param attributes attributes for the mutation that are used by the coprocessor
- * to set/read the cell tags
+ * @param attributes attributes for the mutation that are used by the
+ * coprocessor to set/read the cell tags.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
- public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
String qualifier, Long timestamp, Object inputValue,
Attribute... attributes) throws IOException;
@@ -86,7 +86,7 @@ public interface ColumnPrefix<T> {
* in the result.
* @throws IOException
*/
- public Object readResult(Result result, String qualifier) throws IOException;
+ Object readResult(Result result, String qualifier) throws IOException;
/**
* @param result from which to read columns
@@ -94,7 +94,7 @@ public interface ColumnPrefix<T> {
* (or all of them if the prefix value is null).
* @throws IOException
*/
- public Map<String, Object> readResults(Result result) throws IOException;
+ Map<String, Object> readResults(Result result) throws IOException;
/**
* @param result from which to reads data with timestamps
@@ -104,7 +104,18 @@ public interface ColumnPrefix<T> {
* idB={timestamp3->value3}, idC={timestamp1->value4}}
* @throws IOException
*/
- public <V> NavigableMap<String, NavigableMap<Long, V>>
+ <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
+ /**
+ * @param qualifierPrefix Column qualifier or prefix of qualifier.
+ * @return a byte array encoding column prefix and qualifier/prefix passed.
+ */
+ byte[] getColumnPrefixBytes(String qualifierPrefix);
+
+ /**
+ * @param qualifierPrefix Column qualifier or prefix of qualifier.
+ * @return a byte array encoding column prefix and qualifier/prefix passed.
+ */
+ byte[] getColumnPrefixBytes(byte[] qualifierPrefix);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cdb96df9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index abede9c..5b71228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -119,6 +119,18 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
return columnPrefix;
}
+ @Override
+ public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
+ @Override
+ public byte[] getColumnPrefixBytes(String qualifierPrefix) {
+ return ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifierPrefix);
+ }
+
/*
* (non-Javadoc)
*
@@ -140,8 +152,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);
@@ -167,8 +178,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
+ tableMutator.getName().getNameAsString());
}
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
+ byte[] columnQualifier = getColumnPrefixBytes(qualifier);
column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
attributes);