You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:21 UTC
[04/50] [abbrv] hadoop git commit: YARN-3049. [Storage
Implementation] Implement storage reader interface to fetch raw data from
HBase backend (Zhijie Shen via sjlee)
YARN-3049. [Storage Implementation] Implement storage reader interface to fetch raw data from HBase backend (Zhijie Shen via sjlee)
(cherry picked from commit 07433c2ad52df9e844dbd90020c277d3df844dcd)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0bed3fb3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0bed3fb3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0bed3fb3
Branch: refs/heads/feature-YARN-2928
Commit: 0bed3fb3b3793fc0fc4b838c024cf5da7c2cc291
Parents: 2d97f86
Author: Sangjin Lee <sj...@apache.org>
Authored: Fri Aug 7 10:00:22 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:37:46 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../dev-support/findbugs-exclude.xml | 5 +
.../records/timelineservice/TimelineEntity.java | 9 +-
.../storage/FileSystemTimelineReaderImpl.java | 164 +++----
.../storage/HBaseTimelineReaderImpl.java | 424 +++++++++++++++++++
.../storage/HBaseTimelineWriterImpl.java | 43 +-
.../storage/TimelineSchemaCreator.java | 12 +
.../storage/apptoflow/AppToFlowColumn.java | 126 ++++++
.../apptoflow/AppToFlowColumnFamily.java | 51 +++
.../storage/apptoflow/AppToFlowRowKey.java | 39 ++
.../storage/apptoflow/AppToFlowTable.java | 110 +++++
.../storage/apptoflow/package-info.java | 23 +
.../storage/common/BaseTable.java | 16 +
.../storage/common/ColumnPrefix.java | 2 +-
.../common/TimelineEntitySchemaConstants.java | 68 ---
.../common/TimelineHBaseSchemaConstants.java | 68 +++
.../storage/common/TimelineReaderUtils.java | 112 +++++
.../storage/entity/EntityColumn.java | 2 +-
.../storage/entity/EntityColumnFamily.java | 2 +-
.../storage/entity/EntityColumnPrefix.java | 2 +-
.../storage/entity/EntityRowKey.java | 36 +-
.../storage/entity/EntityTable.java | 8 +-
.../storage/TestHBaseTimelineWriterImpl.java | 82 +++-
23 files changed, 1197 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f40cfe8..0378df8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -82,6 +82,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3949. Ensure timely flush of timeline writes. (Sangjin Lee via
junping_du)
+ YARN-3049. [Storage Implementation] Implement storage reader interface to
+ fetch raw data from HBase backend (Zhijie Shen via sjlee)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 7505645..0dcdd15 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -522,4 +522,9 @@
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
</Match>
+ <!-- Object cast is based on the event type -->
+ <Match>
+ <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
+ <Bug pattern="BC_UNCONFIRMED_CAST" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
index 9ef2d90..0701001 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -29,7 +29,9 @@ import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
+import java.util.TreeSet;
/**
* The basic timeline entity data structure for timeline service v2. Timeline
@@ -133,7 +135,8 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
private HashMap<String, Object> info = new HashMap<>();
private HashMap<String, String> configs = new HashMap<>();
private Set<TimelineMetric> metrics = new HashSet<>();
- private Set<TimelineEvent> events = new HashSet<>();
+ // events should be sorted by timestamp in descending order
+ private NavigableSet<TimelineEvent> events = new TreeSet<>();
private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
private long createdTime;
@@ -334,7 +337,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
}
@XmlElement(name = "events")
- public Set<TimelineEvent> getEvents() {
+ public NavigableSet<TimelineEvent> getEvents() {
if (real == null) {
return events;
} else {
@@ -342,7 +345,7 @@ public class TimelineEntity implements Comparable<TimelineEntity> {
}
}
- public void setEvents(Set<TimelineEvent> events) {
+ public void setEvents(NavigableSet<TimelineEvent> events) {
if (real == null) {
this.events = events;
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index f9f1d1d..45ddd1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -119,59 +120,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
private static void fillFields(TimelineEntity finalEntity,
TimelineEntity real, EnumSet<Field> fields) {
if (fields.contains(Field.ALL)) {
- finalEntity.setConfigs(real.getConfigs());
- finalEntity.setMetrics(real.getMetrics());
- finalEntity.setInfo(real.getInfo());
- finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
- finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
- finalEntity.setEvents(real.getEvents());
- return;
+ fields = EnumSet.allOf(Field.class);
}
for (Field field : fields) {
switch(field) {
- case CONFIGS:
- finalEntity.setConfigs(real.getConfigs());
- break;
- case METRICS:
- finalEntity.setMetrics(real.getMetrics());
- break;
- case INFO:
- finalEntity.setInfo(real.getInfo());
- break;
- case IS_RELATED_TO:
- finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
- break;
- case RELATES_TO:
- finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
- break;
- case EVENTS:
- finalEntity.setEvents(real.getEvents());
- break;
- default:
- continue;
- }
- }
- }
-
- private static boolean matchFilter(Object infoValue, Object filterValue) {
- return infoValue.equals(filterValue);
- }
-
- private static boolean matchFilters(Map<String, ? extends Object> entityInfo,
- Map<String, ? extends Object> filters) {
- if (entityInfo == null || entityInfo.isEmpty()) {
- return false;
- }
- for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
- Object infoValue = entityInfo.get(filter.getKey());
- if (infoValue == null) {
- return false;
- }
- if (!matchFilter(infoValue, filter.getValue())) {
- return false;
+ case CONFIGS:
+ finalEntity.setConfigs(real.getConfigs());
+ break;
+ case METRICS:
+ finalEntity.setMetrics(real.getMetrics());
+ break;
+ case INFO:
+ finalEntity.setInfo(real.getInfo());
+ break;
+ case IS_RELATED_TO:
+ finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+ break;
+ case RELATES_TO:
+ finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
+ break;
+ case EVENTS:
+ finalEntity.setEvents(real.getEvents());
+ break;
+ default:
+ continue;
}
}
- return true;
}
private String getFlowRunPath(String userId, String clusterId, String flowId,
@@ -186,10 +160,10 @@ public class FileSystemTimelineReaderImpl extends AbstractService
String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
clusterId + "/" + APP_FLOW_MAPPING_FILE;
try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(
- new FileInputStream(
- appFlowMappingFile), Charset.forName("UTF-8")));
- CSVParser parser = new CSVParser(reader, csvFormat)) {
+ new BufferedReader(new InputStreamReader(
+ new FileInputStream(
+ appFlowMappingFile), Charset.forName("UTF-8")));
+ CSVParser parser = new CSVParser(reader, csvFormat)) {
for (CSVRecord record : parser.getRecords()) {
if (record.size() < 4) {
continue;
@@ -207,36 +181,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
throw new IOException("Unable to get flow info");
}
- private static boolean matchMetricFilters(Set<TimelineMetric> metrics,
- Set<String> metricFilters) {
- Set<String> tempMetrics = new HashSet<String>();
- for (TimelineMetric metric : metrics) {
- tempMetrics.add(metric.getId());
- }
-
- for (String metricFilter : metricFilters) {
- if (!tempMetrics.contains(metricFilter)) {
- return false;
- }
- }
- return true;
- }
-
- private static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
- Set<String> eventFilters) {
- Set<String> tempEvents = new HashSet<String>();
- for (TimelineEvent event : entityEvents) {
- tempEvents.add(event.getId());
- }
-
- for (String eventFilter : eventFilters) {
- if (!tempEvents.contains(eventFilter)) {
- return false;
- }
- }
- return true;
- }
-
private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
EnumSet<Field> fieldsToRetrieve) {
TimelineEntity entityToBeReturned = new TimelineEntity();
@@ -254,23 +198,6 @@ public class FileSystemTimelineReaderImpl extends AbstractService
return (time >= timeBegin) && (time <= timeEnd);
}
- private static boolean matchRelations(
- Map<String, Set<String>> entityRelations,
- Map<String, Set<String>> relations) {
- for (Map.Entry<String, Set<String>> relation : relations.entrySet()) {
- Set<String> ids = entityRelations.get(relation.getKey());
- if (ids == null) {
- return false;
- }
- for (String id : relation.getValue()) {
- if (!ids.contains(id)) {
- return false;
- }
- }
- }
- return true;
- }
-
private static void mergeEntities(TimelineEntity entity1,
TimelineEntity entity2) {
// Ideally created time wont change except in the case of issue from client.
@@ -364,22 +291,22 @@ public class FileSystemTimelineReaderImpl extends AbstractService
// First sort the selected entities based on created/start time.
Map<Long, Set<TimelineEntity>> sortedEntities =
new TreeMap<>(
- new Comparator<Long>() {
- @Override
- public int compare(Long l1, Long l2) {
- return l2.compareTo(l1);
+ new Comparator<Long>() {
+ @Override
+ public int compare(Long l1, Long l2) {
+ return l2.compareTo(l1);
+ }
}
- }
);
for (File entityFile : dir.listFiles()) {
if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
continue;
}
try (BufferedReader reader =
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(
- entityFile), Charset.forName("UTF-8")))) {
+ new BufferedReader(
+ new InputStreamReader(
+ new FileInputStream(
+ entityFile), Charset.forName("UTF-8")))) {
TimelineEntity entity = readEntityFromFile(reader);
if (!entity.getType().equals(entityType)) {
continue;
@@ -393,27 +320,32 @@ public class FileSystemTimelineReaderImpl extends AbstractService
continue;
}
if (relatesTo != null && !relatesTo.isEmpty() &&
- !matchRelations(entity.getRelatesToEntities(), relatesTo)) {
+ !TimelineReaderUtils
+ .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
continue;
}
if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
- !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
+ !TimelineReaderUtils
+ .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
continue;
}
if (infoFilters != null && !infoFilters.isEmpty() &&
- !matchFilters(entity.getInfo(), infoFilters)) {
+ !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
continue;
}
if (configFilters != null && !configFilters.isEmpty() &&
- !matchFilters(entity.getConfigs(), configFilters)) {
+ !TimelineReaderUtils.matchFilters(
+ entity.getConfigs(), configFilters)) {
continue;
}
if (metricFilters != null && !metricFilters.isEmpty() &&
- !matchMetricFilters(entity.getMetrics(), metricFilters)) {
+ !TimelineReaderUtils.matchMetricFilters(
+ entity.getMetrics(), metricFilters)) {
continue;
}
if (eventFilters != null && !eventFilters.isEmpty() &&
- !matchEventFilters(entity.getEvents(), eventFilters)) {
+ !TimelineReaderUtils.matchEventFilters(
+ entity.getEvents(), eventFilters)) {
continue;
}
TimelineEntity entityToBeReturned =
@@ -461,8 +393,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService
File entityFile =
new File(dir, entityId + TIMELINE_SERVICE_STORAGE_EXTENSION);
try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(
- new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
+ new BufferedReader(new InputStreamReader(
+ new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
TimelineEntity entity = readEntityFromFile(reader);
return createEntityToBeReturned(entity, fieldsToRetrieve);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
new file mode 100644
index 0000000..5258b9c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class HBaseTimelineReaderImpl
+ extends AbstractService implements TimelineReader {
+
+ private static final Log LOG = LogFactory
+ .getLog(HBaseTimelineReaderImpl.class);
+ private static final long DEFAULT_BEGIN_TIME = 0L;
+ private static final long DEFAULT_END_TIME = Long.MAX_VALUE;
+
+ private Configuration hbaseConf = null;
+ private Connection conn;
+ private EntityTable entityTable;
+ private AppToFlowTable appToFlowTable;
+
+ public HBaseTimelineReaderImpl() {
+ super(HBaseTimelineReaderImpl.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ hbaseConf = HBaseConfiguration.create(conf);
+ conn = ConnectionFactory.createConnection(hbaseConf);
+ entityTable = new EntityTable();
+ appToFlowTable = new AppToFlowTable();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (conn != null) {
+ LOG.info("closing the hbase Connection");
+ conn.close();
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineEntity getEntity(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId, EnumSet<Field> fieldsToRetrieve)
+ throws IOException {
+ validateParams(userId, clusterId, appId, entityType, entityId, true);
+ // In reality both should be null or neither should be null
+ if (flowId == null || flowRunId == null) {
+ FlowContext context = lookupFlowContext(clusterId, appId);
+ flowId = context.flowId;
+ flowRunId = context.flowRunId;
+ }
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.noneOf(Field.class);
+ }
+
+ byte[] rowKey = EntityRowKey.getRowKey(
+ clusterId, userId, flowId, flowRunId, appId, entityType, entityId);
+ Get get = new Get(rowKey);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ return parseEntity(
+ entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve,
+ false, DEFAULT_BEGIN_TIME, DEFAULT_END_TIME, false, DEFAULT_BEGIN_TIME,
+ DEFAULT_END_TIME, null, null, null, null, null, null);
+ }
+
+ @Override
+ public Set<TimelineEntity> getEntities(String userId, String clusterId,
+ String flowId, Long flowRunId, String appId, String entityType,
+ Long limit, Long createdTimeBegin, Long createdTimeEnd,
+ Long modifiedTimeBegin, Long modifiedTimeEnd,
+ Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> metricFilters, Set<String> eventFilters,
+ EnumSet<Field> fieldsToRetrieve) throws IOException {
+ validateParams(userId, clusterId, appId, entityType, null, false);
+ // In reality both should be null or neither should be null
+ if (flowId == null || flowRunId == null) {
+ FlowContext context = lookupFlowContext(clusterId, appId);
+ flowId = context.flowId;
+ flowRunId = context.flowRunId;
+ }
+ if (limit == null) {
+ limit = TimelineReader.DEFAULT_LIMIT;
+ }
+ if (createdTimeBegin == null) {
+ createdTimeBegin = DEFAULT_BEGIN_TIME;
+ }
+ if (createdTimeEnd == null) {
+ createdTimeEnd = DEFAULT_END_TIME;
+ }
+ if (modifiedTimeBegin == null) {
+ modifiedTimeBegin = DEFAULT_BEGIN_TIME;
+ }
+ if (modifiedTimeEnd == null) {
+ modifiedTimeEnd = DEFAULT_END_TIME;
+ }
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.noneOf(Field.class);
+ }
+
+ NavigableSet<TimelineEntity> entities = new TreeSet<>();
+ // Scan through part of the table to find the entities belong to one app and
+ // one type
+ Scan scan = new Scan();
+ scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
+ clusterId, userId, flowId, flowRunId, appId, entityType));
+ scan.setMaxVersions(Integer.MAX_VALUE);
+ ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan);
+ for (Result result : scanner) {
+ TimelineEntity entity = parseEntity(result, fieldsToRetrieve,
+ true, createdTimeBegin, createdTimeEnd,
+ true, modifiedTimeBegin, modifiedTimeEnd,
+ isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters,
+ metricFilters);
+ if (entity == null) {
+ continue;
+ }
+ if (entities.size() > limit) {
+ entities.pollLast();
+ }
+ entities.add(entity);
+ }
+ return entities;
+ }
+
+ private FlowContext lookupFlowContext(String clusterId, String appId)
+ throws IOException {
+ byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+ Get get = new Get(rowKey);
+ Result result = appToFlowTable.getResult(hbaseConf, conn, get);
+ if (result != null && !result.isEmpty()) {
+ return new FlowContext(
+ AppToFlowColumn.FLOW_ID.readResult(result).toString(),
+ ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
+ } else {
+ throw new IOException(
+ "Unable to find the context flow ID and flow run ID for clusterId=" +
+ clusterId + ", appId=" + appId);
+ }
+ }
+
+ private static class FlowContext {
+ private String flowId;
+ private Long flowRunId;
+ public FlowContext(String flowId, Long flowRunId) {
+ this.flowId = flowId;
+ this.flowRunId = flowRunId;
+ }
+ }
+
+ private static void validateParams(String userId, String clusterId,
+ String appId, String entityType, String entityId, boolean checkEntityId) {
+ Preconditions.checkNotNull(userId, "userId shouldn't be null");
+ Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
+ Preconditions.checkNotNull(appId, "appId shouldn't be null");
+ Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
+ if (checkEntityId) {
+ Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
+ }
+ }
+
+ private static TimelineEntity parseEntity(
+ Result result, EnumSet<Field> fieldsToRetrieve,
+ boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd,
+ boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd,
+ Map<String, Set<String>> isRelatedTo, Map<String, Set<String>> relatesTo,
+ Map<String, Object> infoFilters, Map<String, String> configFilters,
+ Set<String> eventFilters, Set<String> metricFilters)
+ throws IOException {
+ if (result == null || result.isEmpty()) {
+ return null;
+ }
+ TimelineEntity entity = new TimelineEntity();
+ entity.setType(EntityColumn.TYPE.readResult(result).toString());
+ entity.setId(EntityColumn.ID.readResult(result).toString());
+
+ // fetch created time
+ entity.setCreatedTime(
+ ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue());
+ if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin ||
+ entity.getCreatedTime() > createdTimeEnd)) {
+ return null;
+ }
+
+ // fetch modified time
+ entity.setCreatedTime(
+ ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue());
+ if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin ||
+ entity.getModifiedTime() > modifiedTimeEnd)) {
+ return null;
+ }
+
+ // fetch is related to entities
+ boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
+ readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO);
+ if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+ entity.getIsRelatedToEntities(), isRelatedTo)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
+ entity.getIsRelatedToEntities().clear();
+ }
+ }
+
+ // fetch relates to entities
+ boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
+ readRelationship(entity, result, EntityColumnPrefix.RELATES_TO);
+ if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+ entity.getRelatesToEntities(), relatesTo)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.RELATES_TO)) {
+ entity.getRelatesToEntities().clear();
+ }
+ }
+
+ // fetch info
+ boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
+ readKeyValuePairs(entity, result, EntityColumnPrefix.INFO);
+ if (checkInfo &&
+ !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.INFO)) {
+ entity.getInfo().clear();
+ }
+ }
+
+ // fetch configs
+ boolean checkConfigs = configFilters != null && configFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
+ readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG);
+ if (checkConfigs && !TimelineReaderUtils.matchFilters(
+ entity.getConfigs(), configFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.CONFIGS)) {
+ entity.getConfigs().clear();
+ }
+ }
+
+ // fetch events
+ boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
+ readEvents(entity, result);
+ if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+ entity.getEvents(), eventFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.EVENTS)) {
+ entity.getEvents().clear();
+ }
+ }
+
+ // fetch metrics
+ boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
+ if (fieldsToRetrieve.contains(Field.ALL) ||
+ fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
+ readMetrics(entity, result);
+ if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+ entity.getMetrics(), metricFilters)) {
+ return null;
+ }
+ if (!fieldsToRetrieve.contains(Field.ALL) &&
+ !fieldsToRetrieve.contains(Field.METRICS)) {
+ entity.getMetrics().clear();
+ }
+ }
+ return entity;
+ }
+
+ private static void readRelationship(
+ TimelineEntity entity, Result result, EntityColumnPrefix prefix)
+ throws IOException {
+ // isRelatedTo and relatesTo are of type Map<String, Set<String>>
+ Map<String, Object> columns = prefix.readResults(result);
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ for (String id : Separator.VALUES.splitEncoded(
+ column.getValue().toString())) {
+ if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) {
+ entity.addIsRelatedToEntity(column.getKey(), id);
+ } else {
+ entity.addRelatesToEntity(column.getKey(), id);
+ }
+ }
+ }
+ }
+
+ private static void readKeyValuePairs(
+ TimelineEntity entity, Result result, EntityColumnPrefix prefix)
+ throws IOException {
+ // info and configuration are of type Map<String, Object or String>
+ Map<String, Object> columns = prefix.readResults(result);
+ if (prefix.equals(EntityColumnPrefix.CONFIG)) {
+ for (Map.Entry<String, Object> column : columns.entrySet()) {
+ entity.addConfig(column.getKey(), column.getKey().toString());
+ }
+ } else {
+ entity.addInfo(columns);
+ }
+ }
+
+ private static void readEvents(TimelineEntity entity, Result result)
+ throws IOException {
+ Map<String, TimelineEvent> eventsMap = new HashMap<>();
+ Map<String, Object> eventsResult =
+ EntityColumnPrefix.EVENT.readResults(result);
+ for (Map.Entry<String,Object> eventResult : eventsResult.entrySet()) {
+ Collection<String> tokens =
+ Separator.VALUES.splitEncoded(eventResult.getKey());
+ if (tokens.size() != 2 && tokens.size() != 3) {
+ throw new IOException(
+ "Invalid event column name: " + eventResult.getKey());
+ }
+ Iterator<String> idItr = tokens.iterator();
+ String id = idItr.next();
+ String tsStr = idItr.next();
+ // TODO: timestamp is not correct via ser/des through UTF-8 string
+ Long ts =
+ TimelineWriterUtils.invert(Bytes.toLong(tsStr.getBytes(
+ StandardCharsets.UTF_8)));
+ String key = Separator.VALUES.joinEncoded(id, ts.toString());
+ TimelineEvent event = eventsMap.get(key);
+ if (event == null) {
+ event = new TimelineEvent();
+ event.setId(id);
+ event.setTimestamp(ts);
+ eventsMap.put(key, event);
+ }
+ if (tokens.size() == 3) {
+ String infoKey = idItr.next();
+ event.addInfo(infoKey, eventResult.getValue());
+ }
+ }
+ Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
+ entity.addEvents(eventsSet);
+ }
+
+ private static void readMetrics(TimelineEntity entity, Result result)
+ throws IOException {
+ NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
+ EntityColumnPrefix.METRIC.readResultsWithTimestamps(result);
+ for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
+ metricsResult.entrySet()) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId(metricResult.getKey());
+ // Simply assume that if the value set contains more than 1 elements, the
+ // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
+ metric.setType(metricResult.getValue().size() > 1 ?
+ TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
+ metric.addValues(metricResult.getValue());
+ entity.addMetric(metric);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 3173e87..5290415 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -33,9 +33,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
@@ -55,6 +60,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
private Connection conn;
private TypedBufferedMutator<EntityTable> entityTable;
+ private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
private static final Log LOG = LogFactory
.getLog(HBaseTimelineWriterImpl.class);
@@ -77,6 +83,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
Configuration hbaseConf = HBaseConfiguration.create(conf);
conn = ConnectionFactory.createConnection(hbaseConf);
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
+ appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
}
/**
@@ -97,7 +104,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
byte[] rowKey =
EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
- te);
+ te.getType(), te.getId());
storeInfo(rowKey, te, flowVersion);
storeEvents(rowKey, te.getEvents());
@@ -107,11 +114,37 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
EntityColumnPrefix.IS_RELATED_TO);
storeRelations(rowKey, te.getRelatesToEntities(),
EntityColumnPrefix.RELATES_TO);
- }
+ if (isApplicationCreated(te)) {
+ onApplicationCreated(
+ clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+ }
+ }
return putStatus;
}
+ private static boolean isApplicationCreated(TimelineEntity te) {
+ if (te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString())) {
+ boolean isAppCreated = false;
+ for (TimelineEvent event : te.getEvents()) {
+ if (event.getId().equals(
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private void onApplicationCreated(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
+ AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
+ AppToFlowColumn.FLOW_RUN_ID.store(
+ rowKey, appToFlowTable, null, flowRunId);
+ }
+
/**
* Stores the Relations from the {@linkplain TimelineEntity} object
*/
@@ -245,6 +278,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
public void flush() throws IOException {
// flush all buffered mutators
entityTable.flush();
+ appToFlowTable.flush();
}
/**
@@ -258,6 +292,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// The close API performs flushing and releases any resources held
entityTable.close();
}
+ if (appToFlowTable != null) {
+ LOG.info("closing app_flow table");
+ // The close API performs flushing and releases any resources held
+ appToFlowTable.close();
+ }
if (conn != null) {
LOG.info("closing the hbase Connection");
conn.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index a5cc2ab..2c3897d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
/**
@@ -70,6 +71,11 @@ public class TimelineSchemaCreator {
int metricsTTL = Integer.parseInt(entityTableTTLMetrics);
new EntityTable().setMetricsTTL(metricsTTL, hbaseConf);
}
+ // Grab the appToflowTableName argument
+ String appToflowTableName = commandLine.getOptionValue("a2f");
+ if (StringUtils.isNotBlank(appToflowTableName)) {
+ hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
+ }
createAllTables(hbaseConf);
}
@@ -95,6 +101,11 @@ public class TimelineSchemaCreator {
o.setRequired(false);
options.addOption(o);
+ o = new Option("a2f", "appToflowTableName", true, "app to flow table name");
+ o.setArgName("appToflowTableName");
+ o.setRequired(false);
+ options.addOption(o);
+
CommandLineParser parser = new PosixParser();
CommandLine commandLine = null;
try {
@@ -120,6 +131,7 @@ public class TimelineSchemaCreator {
throw new IOException("Cannot create table since admin is null");
}
new EntityTable().createTable(admin, hbaseConf);
+ new AppToFlowTable().createTable(admin, hbaseConf);
} finally {
if (conn != null) {
conn.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
new file mode 100644
index 0000000..423037a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+import java.io.IOException;
+
+/**
+ * Identifies fully qualified columns for the {@link AppToFlowTable}.
+ */
+public enum AppToFlowColumn implements Column<AppToFlowTable> {
+
+ /**
+ * The flow ID
+ */
+ FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
+
+ /**
+ * The flow run ID
+ */
+ FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id");
+
+ private final ColumnHelper<AppToFlowTable> column;
+ private final ColumnFamily<AppToFlowTable> columnFamily;
+ private final String columnQualifier;
+ private final byte[] columnQualifierBytes;
+
+ AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily,
+ String columnQualifier) {
+ this.columnFamily = columnFamily;
+ this.columnQualifier = columnQualifier;
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnQualifierBytes =
+ Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
+ this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
+ }
+
+ /**
+ * @return the column name value
+ */
+ private String getColumnQualifier() {
+ return columnQualifier;
+ }
+
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
+ Object inputValue) throws IOException {
+ column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+ inputValue);
+ }
+
+ public Object readResult(Result result) throws IOException {
+ return column.readResult(result, columnQualifierBytes);
+ }
+
+ /**
+ * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+ * and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnQualifier Name of the column to retrieve
+ * @return the corresponding {@link AppToFlowColumn} or null
+ */
+ public static final AppToFlowColumn columnFor(String columnQualifier) {
+
+ // Match column based on value, assume column family matches.
+ for (AppToFlowColumn ec : AppToFlowColumn.values()) {
+ // Find a match based only on name.
+ if (ec.getColumnQualifier().equals(columnQualifier)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link AppToFlowColumn} given a name, or null if there is no
+ * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+ * if and only if {@code a.equals(b) & x.equals(y)} or
+ * {@code (x == y == null)}
+ *
+ * @param columnFamily The columnFamily for which to retrieve the column.
+ * @param name Name of the column to retrieve
+ * @return the corresponding {@link AppToFlowColumn} or null if both arguments
+ * don't match.
+ */
+ public static final AppToFlowColumn columnFor(
+ AppToFlowColumnFamily columnFamily, String name) {
+
+ for (AppToFlowColumn ec : AppToFlowColumn.values()) {
+ // Find a match based column family and on name.
+ if (ec.columnFamily.equals(columnFamily)
+ && ec.getColumnQualifier().equals(name)) {
+ return ec;
+ }
+ }
+
+ // Default to null
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
new file mode 100644
index 0000000..e74235f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumnFamily.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the app_flow table column families.
+ */
+public enum AppToFlowColumnFamily implements ColumnFamily<AppToFlowTable> {
+ /**
+ * Mapping column family houses known columns such as flowId and flowRunId
+ */
+ MAPPING("m");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ AppToFlowColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
new file mode 100644
index 0000000..ad4fec6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the app_flow table.
+ */
+public class AppToFlowRowKey {
+ /**
+ * Constructs a row key prefix for the app_flow table as follows:
+ * {@code clusterId!AppId}
+ *
+ * @param clusterId
+ * @param appId
+ * @return byte array with the row key
+ */
+ public static byte[] getRowKey(String clusterId, String appId) {
+ return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
new file mode 100644
index 0000000..2467856
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+
+import java.io.IOException;
+
+/**
+ * The app_flow table as column families mapping. Mapping stores
+ * appId to flowId and flowRunId mapping information
+ *
+ * Example app_flow table record:
+ *
+ * <pre>
+ * |--------------------------------------|
+ * | Row | Column Family |
+ * | key | info |
+ * |--------------------------------------|
+ * | clusterId! | flowId: |
+ * | AppId | foo@daily_hive_report |
+ * | | |
+ * | | flowRunId: |
+ * | | 1452828720457 |
+ * | | |
+ * | | |
+ * | | |
+ * |--------------------------------------|
+ * </pre>
+ */
+public class AppToFlowTable extends BaseTable<AppToFlowTable> {
+ /** app_flow prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
+
+ /** config param name that specifies the app_flow table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for app_flow table name */
+ private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
+
+ private static final Log LOG = LogFactory.getLog(AppToFlowTable.class);
+
+ public AppToFlowTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor mappCF =
+ new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
+ mappCF.setBloomFilterType(BloomType.ROWCOL);
+ appToFlowTableDescp.addFamily(mappCF);
+
+ appToFlowTableDescp
+ .setRegionSplitPolicyClassName(
+ "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(appToFlowTableDescp,
+ TimelineHBaseSchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
new file mode 100644
index 0000000..df7ffc1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index e8d8b5c..abba79a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -94,6 +96,20 @@ public abstract class BaseTable<T> {
}
/**
+ *
+ * @param hbaseConf used to read settings that override defaults
+ * @param conn used to create table from
+ * @param get that specifies what single row you want to get from this table
+ * @return result of get operation
+ * @throws IOException
+ */
+ public Result getResult(Configuration hbaseConf, Connection conn, Get get)
+ throws IOException {
+ Table table = conn.getTable(getTableName(hbaseConf));
+ return table.get(get);
+ }
+
+ /**
* Get the table name for this table.
*
* @param hbaseConf
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 671c824..509ff49 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -64,7 +64,7 @@ public interface ColumnPrefix<T> {
public Object readResult(Result result, String qualifier) throws IOException;
/**
- * @param resultfrom which to read columns
+ * @param result from which to read columns
* @return the latest values of columns in the column family with this prefix
* (or all of them if the prefix value is null).
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
deleted file mode 100644
index 5518a27..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * contains the constants used in the context of schema accesses for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * information
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
-
- /**
- * Used to create a pre-split for tables starting with a username in the
- * prefix. TODO: this may have to become a config variable (string with
- * separators) so that different installations can presplit based on their own
- * commonly occurring names.
- */
- private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
- Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
- Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
- Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
- Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
- Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
- Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
- Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
- Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
- Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
- Bytes.toBytes("z") };
-
- /**
- * The length at which keys auto-split
- */
- public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
-
- /**
- * @return splits for splits where a user is a prefix.
- */
- public final static byte[][] getUsernameSplits() {
- byte[][] kloon = USERNAME_SPLITS.clone();
- // Deep copy.
- for (int row = 0; row < USERNAME_SPLITS.length; row++) {
- kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
- }
- return kloon;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
new file mode 100644
index 0000000..bbf498a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineHBaseSchemaConstants {
+
+ /**
+ * Used to create a pre-split for tables starting with a username in the
+ * prefix. TODO: this may have to become a config variable (string with
+ * separators) so that different installations can presplit based on their own
+ * commonly occurring names.
+ */
+ private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
+ Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
+ Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
+ Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
+ Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
+ Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
+ Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
+ Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
+ Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
+ Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
+ Bytes.toBytes("z") };
+
+ /**
+ * The length at which keys auto-split
+ */
+ public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+ /**
+ * @return splits for splits where a user is a prefix.
+ */
+ public final static byte[][] getUsernameSplits() {
+ byte[][] kloon = USERNAME_SPLITS.clone();
+ // Deep copy.
+ for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+ kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+ }
+ return kloon;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
new file mode 100644
index 0000000..91d7ba4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TimelineReaderUtils {
+ /**
+ *
+ * @param entityRelations the relations of an entity
+ * @param relationFilters the relations for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchRelations(
+ Map<String, Set<String>> entityRelations,
+ Map<String, Set<String>> relationFilters) {
+ for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+ Set<String> ids = entityRelations.get(relation.getKey());
+ if (ids == null) {
+ return false;
+ }
+ for (String id : relation.getValue()) {
+ if (!ids.contains(id)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param map the map of key/value pairs in an entity
+ * @param filters the map of key/value pairs for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchFilters(Map<String, ? extends Object> map,
+ Map<String, ? extends Object> filters) {
+ for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+ Object value = map.get(filter.getKey());
+ if (value == null) {
+ return false;
+ }
+ if (!value.equals(filter.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param entityEvents the set of event objects in an entity
+ * @param eventFilters the set of event Ids for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+ Set<String> eventFilters) {
+ Set<String> eventIds = new HashSet<String>();
+ for (TimelineEvent event : entityEvents) {
+ eventIds.add(event.getId());
+ }
+ for (String eventFilter : eventFilters) {
+ if (!eventIds.contains(eventFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param metrics the set of metric objects in an entity
+ * @param metricFilters the set of metric Ids for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+ Set<String> metricFilters) {
+ Set<String> metricIds = new HashSet<String>();
+ for (TimelineMetric metric : metrics) {
+ metricIds.add(metric.getId());
+ }
+
+ for (String metricFilter : metricFilters) {
+ if (!metricIds.contains(metricFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 90da966..26e7748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -62,7 +62,7 @@ public enum EntityColumn implements Column<EntityTable> {
private final String columnQualifier;
private final byte[] columnQualifierBytes;
- private EntityColumn(ColumnFamily<EntityTable> columnFamily,
+ EntityColumn(ColumnFamily<EntityTable> columnFamily,
String columnQualifier) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
index 8a95d12..7c63727 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
@@ -53,7 +53,7 @@ public enum EntityColumnFamily implements ColumnFamily<EntityTable> {
* @param value create a column family with this name. Must be lower case and
* without spaces.
*/
- private EntityColumnFamily(String value) {
+ EntityColumnFamily(String value) {
// column families should be lower case and not contain any spaces.
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 8b7bc3e..58272ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -80,7 +80,7 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
*/
- private EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
+ EntityColumnPrefix(ColumnFamily<EntityTable> columnFamily,
String columnPrefix) {
column = new ColumnHelper<EntityTable>(columnFamily);
this.columnFamily = columnFamily;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 3e17ad0..9a72be0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -55,17 +55,45 @@ public class EntityRowKey {
/**
* Constructs a row key prefix for the entity table as follows:
- * {@code userName!clusterId!flowId!flowRunId!AppId}
+ * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!}
*
* @param clusterId
* @param userId
* @param flowId
* @param flowRunId
* @param appId
+ * @param entityType
* @return byte array with the row key prefix
*/
+ public static byte[] getRowKeyPrefix(String clusterId, String userId,
+ String flowId, Long flowRunId, String appId, String entityType) {
+ byte[] first =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+ flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ byte[] third =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
+ return Separator.QUALIFIERS.join(first, second, third);
+ }
+
+ /**
+ * Constructs a row key for the entity table as follows:
+ * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @param appId
+ * @param entityType
+ * @param entityId
+ * @return byte array with the row key
+ */
public static byte[] getRowKey(String clusterId, String userId,
- String flowId, Long flowRunId, String appId, TimelineEntity te) {
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId) {
byte[] first =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
flowId));
@@ -73,8 +101,8 @@ public class EntityRowKey {
// time.
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
byte[] third =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
- te.getId()));
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
+ entityId));
return Separator.QUALIFIERS.join(first, second, third);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bed3fb3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 2ae7d39..f657a14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
/**
* The entity table as column families info, config and metrics. Info stores
@@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEnti
public class EntityTable extends BaseTable<EntityTable> {
/** entity prefix */
private static final String PREFIX =
- YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
/** config param name that specifies the entity table name */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
@@ -146,9 +146,9 @@ public class EntityTable extends BaseTable<EntityTable> {
entityTableDescp
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
- TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
admin.createTable(entityTableDescp,
- TimelineEntitySchemaConstants.getUsernameSplits());
+ TimelineHBaseSchemaConstants.getUsernameSplits());
LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table));
}