You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:14:06 UTC
[49/50] [abbrv] hadoop git commit: YARN-4200. Refactor reader classes
in storage to nest under hbase specific package name. Contributed by Li Lu.
YARN-4200. Refactor reader classes in storage to nest under hbase
specific package name. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/63d90992
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/63d90992
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/63d90992
Branch: refs/heads/feature-YARN-2928
Commit: 63d909927b4dd123f43523c1449033b419f51834
Parents: 5157c30
Author: Li Lu <gt...@apache.org>
Authored: Mon Jan 11 18:05:36 2016 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 18:03:31 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../storage/ApplicationEntityReader.java | 382 --------------
.../storage/FlowActivityEntityReader.java | 163 ------
.../storage/FlowRunEntityReader.java | 225 ---------
.../storage/GenericEntityReader.java | 496 ------------------
.../storage/HBaseTimelineReaderImpl.java | 2 +
.../storage/TimelineEntityReader.java | 274 ----------
.../storage/TimelineEntityReaderFactory.java | 100 ----
.../storage/reader/ApplicationEntityReader.java | 383 ++++++++++++++
.../reader/FlowActivityEntityReader.java | 164 ++++++
.../storage/reader/FlowRunEntityReader.java | 226 +++++++++
.../storage/reader/GenericEntityReader.java | 497 +++++++++++++++++++
.../storage/reader/TimelineEntityReader.java | 274 ++++++++++
.../reader/TimelineEntityReaderFactory.java | 100 ++++
.../storage/reader/package-info.java | 23 +
15 files changed, 1672 insertions(+), 1640 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7827d77..1a0ed56 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -129,6 +129,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
IMPROVEMENTS
+ YARN-4200. Refactor reader classes in storage to nest under hbase specific
+ package name. Contributed by Li Lu.
+
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
zjshen)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
deleted file mode 100644
index d812a6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for application entities that are stored in the
- * application table.
- */
-class ApplicationEntityReader extends GenericEntityReader {
- private static final ApplicationTable APPLICATION_TABLE =
- new ApplicationTable();
-
- public ApplicationEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- Long limit, Long createdTimeBegin, Long createdTimeEnd,
- Long modifiedTimeBegin, Long modifiedTimeEnd,
- Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> metricFilters, Set<String> eventFilters,
- TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
- EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
- createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
- relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
- true);
- }
-
- public ApplicationEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- String entityId, TimelineFilterList confsToRetrieve,
- TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
- confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
- }
-
- /**
- * Uses the {@link ApplicationTable}.
- */
- protected BaseTable<?> getTable() {
- return APPLICATION_TABLE;
- }
-
- @Override
- protected FilterList constructFilterListBasedOnFields() {
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
- // Fetch all the columns.
- if (fieldsToRetrieve.contains(Field.ALL) &&
- (confsToRetrieve == null ||
- confsToRetrieve.getFilterList().isEmpty()) &&
- (metricsToRetrieve == null ||
- metricsToRetrieve.getFilterList().isEmpty())) {
- return list;
- }
- FilterList infoColFamilyList = new FilterList();
- // By default fetch everything in INFO column family.
- FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(ApplicationColumnFamily.INFO.getBytes()));
- infoColFamilyList.addFilter(infoColumnFamily);
- // Events not required.
- if (!fieldsToRetrieve.contains(Field.EVENTS) &&
- !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.EVENT.getColumnPrefixBytes(""))));
- }
- // info not required.
- if (!fieldsToRetrieve.contains(Field.INFO) &&
- !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.INFO.getColumnPrefixBytes(""))));
- }
- // is releated to not required.
- if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
- !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
- }
- // relates to not required.
- if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
- !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
- }
- list.addFilter(infoColFamilyList);
- if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
- (confsToRetrieve != null &&
- !confsToRetrieve.getFilterList().isEmpty())) {
- FilterList filterCfg =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes())));
- if (confsToRetrieve != null &&
- !confsToRetrieve.getFilterList().isEmpty()) {
- filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
- ApplicationColumnPrefix.CONFIG, confsToRetrieve));
- }
- list.addFilter(filterCfg);
- }
- if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
- (metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty())) {
- FilterList filterMetrics =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes())));
- if (metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
- filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
- ApplicationColumnPrefix.METRIC, metricsToRetrieve));
- }
- list.addFilter(filterMetrics);
- }
- return list;
- }
-
- @Override
- protected Result getResult(Configuration hbaseConf, Connection conn,
- FilterList filterList) throws IOException {
- byte[] rowKey =
- ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
- appId);
- Get get = new Get(rowKey);
- get.setMaxVersions(Integer.MAX_VALUE);
- if (filterList != null && !filterList.getFilters().isEmpty()) {
- get.setFilter(filterList);
- }
- return table.getResult(hbaseConf, conn, get);
- }
-
- @Override
- protected void validateParams() {
- Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
- Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
- if (singleEntityRead) {
- Preconditions.checkNotNull(appId, "appId shouldn't be null");
- } else {
- Preconditions.checkNotNull(userId, "userId shouldn't be null");
- Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
- }
- }
-
- @Override
- protected void augmentParams(Configuration hbaseConf, Connection conn)
- throws IOException {
- if (singleEntityRead) {
- if (flowName == null || flowRunId == null || userId == null) {
- FlowContext context =
- lookupFlowContext(clusterId, appId, hbaseConf, conn);
- flowName = context.flowName;
- flowRunId = context.flowRunId;
- userId = context.userId;
- }
- }
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.noneOf(Field.class);
- }
- if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
- confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
- fieldsToRetrieve.add(Field.CONFIGS);
- }
- if (!fieldsToRetrieve.contains(Field.METRICS) &&
- metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
- fieldsToRetrieve.add(Field.METRICS);
- }
- if (!singleEntityRead) {
- if (limit == null || limit < 0) {
- limit = TimelineReader.DEFAULT_LIMIT;
- }
- if (createdTimeBegin == null) {
- createdTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (createdTimeEnd == null) {
- createdTimeEnd = DEFAULT_END_TIME;
- }
- if (modifiedTimeBegin == null) {
- modifiedTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (modifiedTimeEnd == null) {
- modifiedTimeEnd = DEFAULT_END_TIME;
- }
- }
- }
-
- @Override
- protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException {
- Scan scan = new Scan();
- if (flowRunId != null) {
- scan.setRowPrefixFilter(ApplicationRowKey.
- getRowKeyPrefix(clusterId, userId, flowName, flowRunId));
- } else {
- scan.setRowPrefixFilter(ApplicationRowKey.
- getRowKeyPrefix(clusterId, userId, flowName));
- }
- FilterList newList = new FilterList();
- newList.addFilter(new PageFilter(limit));
- if (filterList != null && !filterList.getFilters().isEmpty()) {
- newList.addFilter(filterList);
- }
- scan.setFilter(newList);
- return table.getResultScanner(hbaseConf, conn, scan);
- }
-
- @Override
- protected TimelineEntity parseEntity(Result result) throws IOException {
- if (result == null || result.isEmpty()) {
- return null;
- }
- TimelineEntity entity = new TimelineEntity();
- entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
- String entityId = ApplicationColumn.ID.readResult(result).toString();
- entity.setId(entityId);
-
- // fetch created time
- Number createdTime =
- (Number)ApplicationColumn.CREATED_TIME.readResult(result);
- entity.setCreatedTime(createdTime.longValue());
- if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
- entity.getCreatedTime() > createdTimeEnd)) {
- return null;
- }
-
- // fetch modified time
- Number modifiedTime =
- (Number)ApplicationColumn.MODIFIED_TIME.readResult(result);
- entity.setModifiedTime(modifiedTime.longValue());
- if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
- entity.getModifiedTime() > modifiedTimeEnd)) {
- return null;
- }
-
- // fetch is related to entities
- boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
- readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
- true);
- if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
- entity.getIsRelatedToEntities(), isRelatedTo)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
- entity.getIsRelatedToEntities().clear();
- }
- }
-
- // fetch relates to entities
- boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
- readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
- false);
- if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
- entity.getRelatesToEntities(), relatesTo)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.RELATES_TO)) {
- entity.getRelatesToEntities().clear();
- }
- }
-
- // fetch info
- boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
- readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
- if (checkInfo &&
- !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.INFO)) {
- entity.getInfo().clear();
- }
- }
-
- // fetch configs
- boolean checkConfigs = configFilters != null && configFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
- readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
- if (checkConfigs && !TimelineStorageUtils.matchFilters(
- entity.getConfigs(), configFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.CONFIGS)) {
- entity.getConfigs().clear();
- }
- }
-
- // fetch events
- boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
- readEvents(entity, result, true);
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(
- entity.getEvents(), eventFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.EVENTS)) {
- entity.getEvents().clear();
- }
- }
-
- // fetch metrics
- boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
- readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
- if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
- entity.getMetrics(), metricFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.METRICS)) {
- entity.getMetrics().clear();
- }
- }
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
deleted file mode 100644
index 7e8d4ba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow activity entities that are stored in the
- * flow activity table.
- */
-class FlowActivityEntityReader extends TimelineEntityReader {
- private static final FlowActivityTable FLOW_ACTIVITY_TABLE =
- new FlowActivityTable();
-
- public FlowActivityEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- Long limit, Long createdTimeBegin, Long createdTimeEnd,
- Long modifiedTimeBegin, Long modifiedTimeEnd,
- Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> metricFilters, Set<String> eventFilters,
- EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
- createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
- relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, null, null, fieldsToRetrieve, true);
- }
-
- public FlowActivityEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- String entityId, EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
- null, null, fieldsToRetrieve);
- }
-
- /**
- * Uses the {@link FlowActivityTable}.
- */
- @Override
- protected BaseTable<?> getTable() {
- return FLOW_ACTIVITY_TABLE;
- }
-
- @Override
- protected void validateParams() {
- Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
- }
-
- @Override
- protected void augmentParams(Configuration hbaseConf, Connection conn)
- throws IOException {
- if (limit == null || limit < 0) {
- limit = TimelineReader.DEFAULT_LIMIT;
- }
- if (createdTimeBegin == null) {
- createdTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (createdTimeEnd == null) {
- createdTimeEnd = DEFAULT_END_TIME;
- }
- }
-
- @Override
- protected FilterList constructFilterListBasedOnFields() {
- return null;
- }
-
- @Override
- protected Result getResult(Configuration hbaseConf, Connection conn,
- FilterList filterList) throws IOException {
- throw new UnsupportedOperationException(
- "we don't support a single entity query");
- }
-
- @Override
- protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException {
- Scan scan = new Scan();
- if (createdTimeBegin == DEFAULT_BEGIN_TIME &&
- createdTimeEnd == DEFAULT_END_TIME) {
- scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
- } else {
- scan.setStartRow(
- FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd));
- scan.setStopRow(
- FlowActivityRowKey.getRowKeyPrefix(clusterId,
- (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1))));
- }
- // use the page filter to limit the result to the page size
- // the scanner may still return more than the limit; therefore we need to
- // read the right number as we iterate
- scan.setFilter(new PageFilter(limit));
- return table.getResultScanner(hbaseConf, conn, scan);
- }
-
- @Override
- protected TimelineEntity parseEntity(Result result) throws IOException {
- FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow());
-
- long time = rowKey.getDayTimestamp();
- String user = rowKey.getUserId();
- String flowName = rowKey.getFlowName();
-
- FlowActivityEntity flowActivity =
- new FlowActivityEntity(clusterId, time, user, flowName);
- // set the id
- flowActivity.setId(flowActivity.getId());
- // get the list of run ids along with the version that are associated with
- // this flow on this day
- Map<String, Object> runIdsMap =
- FlowActivityColumnPrefix.RUN_ID.readResults(result);
- for (Map.Entry<String, Object> e : runIdsMap.entrySet()) {
- Long runId = Long.valueOf(e.getKey());
- String version = (String)e.getValue();
- FlowRunEntity flowRun = new FlowRunEntity();
- flowRun.setUser(user);
- flowRun.setName(flowName);
- flowRun.setRunId(runId);
- flowRun.setVersion(version);
- // set the id
- flowRun.setId(flowRun.getId());
- flowActivity.addFlowRun(flowRun);
- }
-
- return flowActivity;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
deleted file mode 100644
index c9076ee..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for flow run entities that are stored in the flow run
- * table.
- */
-class FlowRunEntityReader extends TimelineEntityReader {
- private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable();
-
- public FlowRunEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- Long limit, Long createdTimeBegin, Long createdTimeEnd,
- Long modifiedTimeBegin, Long modifiedTimeEnd,
- Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> metricFilters, Set<String> eventFilters,
- TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
- EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
- createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
- relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true);
- }
-
- public FlowRunEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- String entityId, TimelineFilterList confsToRetrieve,
- TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
- null, metricsToRetrieve, fieldsToRetrieve);
- }
-
- /**
- * Uses the {@link FlowRunTable}.
- */
- @Override
- protected BaseTable<?> getTable() {
- return FLOW_RUN_TABLE;
- }
-
- @Override
- protected void validateParams() {
- Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
- Preconditions.checkNotNull(userId, "userId shouldn't be null");
- Preconditions.checkNotNull(flowName, "flowName shouldn't be null");
- if (singleEntityRead) {
- Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null");
- }
- }
-
- @Override
- protected void augmentParams(Configuration hbaseConf, Connection conn) {
- if (!singleEntityRead) {
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.noneOf(Field.class);
- }
- if (limit == null || limit < 0) {
- limit = TimelineReader.DEFAULT_LIMIT;
- }
- if (createdTimeBegin == null) {
- createdTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (createdTimeEnd == null) {
- createdTimeEnd = DEFAULT_END_TIME;
- }
- if (!fieldsToRetrieve.contains(Field.METRICS) &&
- metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
- fieldsToRetrieve.add(Field.METRICS);
- }
- }
- }
-
- @Override
- protected FilterList constructFilterListBasedOnFields() {
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-
- // By default fetch everything in INFO column family.
- FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(FlowRunColumnFamily.INFO.getBytes()));
- // Metrics not required.
- if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) &&
- !fieldsToRetrieve.contains(Field.ALL)) {
- FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
- infoColFamilyList.addFilter(infoColumnFamily);
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(""))));
- list.addFilter(infoColFamilyList);
- }
- if (metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
- FilterList infoColFamilyList = new FilterList();
- infoColFamilyList.addFilter(infoColumnFamily);
- infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList(
- FlowRunColumnPrefix.METRIC, metricsToRetrieve));
- list.addFilter(infoColFamilyList);
- }
- return list;
- }
-
- @Override
- protected Result getResult(Configuration hbaseConf, Connection conn,
- FilterList filterList) throws IOException {
- byte[] rowKey =
- FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
- Get get = new Get(rowKey);
- get.setMaxVersions(Integer.MAX_VALUE);
- if (filterList != null && !filterList.getFilters().isEmpty()) {
- get.setFilter(filterList);
- }
- return table.getResult(hbaseConf, conn, get);
- }
-
- @Override
- protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException {
- Scan scan = new Scan();
- scan.setRowPrefixFilter(
- FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName));
- FilterList newList = new FilterList();
- newList.addFilter(new PageFilter(limit));
- if (filterList != null && !filterList.getFilters().isEmpty()) {
- newList.addFilter(filterList);
- }
- scan.setFilter(newList);
- return table.getResultScanner(hbaseConf, conn, scan);
- }
-
- @Override
- protected TimelineEntity parseEntity(Result result) throws IOException {
- FlowRunEntity flowRun = new FlowRunEntity();
- flowRun.setUser(userId);
- flowRun.setName(flowName);
- if (singleEntityRead) {
- flowRun.setRunId(flowRunId);
- } else {
- FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow());
- flowRun.setRunId(rowKey.getFlowRunId());
- }
-
- // read the start time
- Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
- if (startTime != null) {
- flowRun.setStartTime(startTime.longValue());
- }
- if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin ||
- flowRun.getStartTime() > createdTimeEnd)) {
- return null;
- }
-
- // read the end time if available
- Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
- if (endTime != null) {
- flowRun.setMaxEndTime(endTime.longValue());
- }
-
- // read the flow version
- String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result);
- if (version != null) {
- flowRun.setVersion(version);
- }
-
- // read metrics
- if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) {
- readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC);
- }
-
- // set the id
- flowRun.setId(flowRun.getId());
- return flowRun;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
deleted file mode 100644
index 784dfd5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
-import org.apache.hadoop.hbase.filter.FamilyFilter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.QualifierFilter;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Timeline entity reader for generic entities that are stored in the entity
- * table.
- */
-class GenericEntityReader extends TimelineEntityReader {
- private static final EntityTable ENTITY_TABLE = new EntityTable();
- private static final Log LOG = LogFactory.getLog(GenericEntityReader.class);
-
- /**
- * Used to look up the flow context.
- */
- private final AppToFlowTable appToFlowTable = new AppToFlowTable();
-
- public GenericEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- Long limit, Long createdTimeBegin, Long createdTimeEnd,
- Long modifiedTimeBegin, Long modifiedTimeEnd,
- Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> metricFilters, Set<String> eventFilters,
- TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
- EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, limit,
- createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd,
- relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters,
- eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve,
- sortedKeys);
- }
-
- public GenericEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- String entityId, TimelineFilterList confsToRetrieve,
- TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
- super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId,
- confsToRetrieve, metricsToRetrieve, fieldsToRetrieve);
- }
-
- /**
- * Uses the {@link EntityTable}.
- */
- protected BaseTable<?> getTable() {
- return ENTITY_TABLE;
- }
-
- @Override
- protected FilterList constructFilterListBasedOnFields() {
- FilterList list = new FilterList(Operator.MUST_PASS_ONE);
- // Fetch all the columns.
- if (fieldsToRetrieve.contains(Field.ALL) &&
- (confsToRetrieve == null ||
- confsToRetrieve.getFilterList().isEmpty()) &&
- (metricsToRetrieve == null ||
- metricsToRetrieve.getFilterList().isEmpty())) {
- return list;
- }
- FilterList infoColFamilyList = new FilterList();
- // By default fetch everything in INFO column family.
- FamilyFilter infoColumnFamily =
- new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.INFO.getBytes()));
- infoColFamilyList.addFilter(infoColumnFamily);
- // Events not required.
- if (!fieldsToRetrieve.contains(Field.EVENTS) &&
- !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.EVENT.getColumnPrefixBytes(""))));
- }
- // info not required.
- if (!fieldsToRetrieve.contains(Field.INFO) &&
- !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.INFO.getColumnPrefixBytes(""))));
- }
- // is related to not required.
- if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) &&
- !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes(""))));
- }
- // relates to not required.
- if (!fieldsToRetrieve.contains(Field.RELATES_TO) &&
- !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) {
- infoColFamilyList.addFilter(
- new QualifierFilter(CompareOp.NOT_EQUAL,
- new BinaryPrefixComparator(
- EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes(""))));
- }
- list.addFilter(infoColFamilyList);
- if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) ||
- (confsToRetrieve != null &&
- !confsToRetrieve.getFilterList().isEmpty())) {
- FilterList filterCfg =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes())));
- if (confsToRetrieve != null &&
- !confsToRetrieve.getFilterList().isEmpty()) {
- filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.CONFIG, confsToRetrieve));
- }
- list.addFilter(filterCfg);
- }
- if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) ||
- (metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty())) {
- FilterList filterMetrics =
- new FilterList(new FamilyFilter(CompareOp.EQUAL,
- new BinaryComparator(EntityColumnFamily.METRICS.getBytes())));
- if (metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
- filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList(
- EntityColumnPrefix.METRIC, metricsToRetrieve));
- }
- list.addFilter(filterMetrics);
- }
- return list;
- }
-
- protected FlowContext lookupFlowContext(String clusterId, String appId,
- Configuration hbaseConf, Connection conn) throws IOException {
- byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
- Get get = new Get(rowKey);
- Result result = appToFlowTable.getResult(hbaseConf, conn, get);
- if (result != null && !result.isEmpty()) {
- return new FlowContext(
- AppToFlowColumn.USER_ID.readResult(result).toString(),
- AppToFlowColumn.FLOW_ID.readResult(result).toString(),
- ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue());
- } else {
- throw new IOException(
- "Unable to find the context flow ID and flow run ID for clusterId=" +
- clusterId + ", appId=" + appId);
- }
- }
-
- protected static class FlowContext {
- protected final String userId;
- protected final String flowName;
- protected final Long flowRunId;
- public FlowContext(String user, String flowName, Long flowRunId) {
- this.userId = user;
- this.flowName = flowName;
- this.flowRunId = flowRunId;
- }
- }
-
- @Override
- protected void validateParams() {
- Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null");
- Preconditions.checkNotNull(appId, "appId shouldn't be null");
- Preconditions.checkNotNull(entityType, "entityType shouldn't be null");
- if (singleEntityRead) {
- Preconditions.checkNotNull(entityId, "entityId shouldn't be null");
- }
- }
-
- @Override
- protected void augmentParams(Configuration hbaseConf, Connection conn)
- throws IOException {
- // In reality all three should be null or neither should be null
- if (flowName == null || flowRunId == null || userId == null) {
- FlowContext context =
- lookupFlowContext(clusterId, appId, hbaseConf, conn);
- flowName = context.flowName;
- flowRunId = context.flowRunId;
- userId = context.userId;
- }
- if (fieldsToRetrieve == null) {
- fieldsToRetrieve = EnumSet.noneOf(Field.class);
- }
- if (!fieldsToRetrieve.contains(Field.CONFIGS) &&
- confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) {
- fieldsToRetrieve.add(Field.CONFIGS);
- }
- if (!fieldsToRetrieve.contains(Field.METRICS) &&
- metricsToRetrieve != null &&
- !metricsToRetrieve.getFilterList().isEmpty()) {
- fieldsToRetrieve.add(Field.METRICS);
- }
- if (!singleEntityRead) {
- if (limit == null || limit < 0) {
- limit = TimelineReader.DEFAULT_LIMIT;
- }
- if (createdTimeBegin == null) {
- createdTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (createdTimeEnd == null) {
- createdTimeEnd = DEFAULT_END_TIME;
- }
- if (modifiedTimeBegin == null) {
- modifiedTimeBegin = DEFAULT_BEGIN_TIME;
- }
- if (modifiedTimeEnd == null) {
- modifiedTimeEnd = DEFAULT_END_TIME;
- }
- }
- }
-
- @Override
- protected Result getResult(Configuration hbaseConf, Connection conn,
- FilterList filterList) throws IOException {
- byte[] rowKey =
- EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
- entityType, entityId);
- Get get = new Get(rowKey);
- get.setMaxVersions(Integer.MAX_VALUE);
- if (filterList != null && !filterList.getFilters().isEmpty()) {
- get.setFilter(filterList);
- }
- return table.getResult(hbaseConf, conn, get);
- }
-
- @Override
- protected ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException {
- // Scan through part of the table to find the entities belong to one app
- // and one type
- Scan scan = new Scan();
- scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix(
- clusterId, userId, flowName, flowRunId, appId, entityType));
- scan.setMaxVersions(Integer.MAX_VALUE);
- if (filterList != null && !filterList.getFilters().isEmpty()) {
- scan.setFilter(filterList);
- }
- return table.getResultScanner(hbaseConf, conn, scan);
- }
-
- @Override
- protected TimelineEntity parseEntity(Result result) throws IOException {
- if (result == null || result.isEmpty()) {
- return null;
- }
- TimelineEntity entity = new TimelineEntity();
- String entityType = EntityColumn.TYPE.readResult(result).toString();
- entity.setType(entityType);
- String entityId = EntityColumn.ID.readResult(result).toString();
- entity.setId(entityId);
-
- // fetch created time
- Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result);
- entity.setCreatedTime(createdTime.longValue());
- if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin ||
- entity.getCreatedTime() > createdTimeEnd)) {
- return null;
- }
-
- // fetch modified time
- Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result);
- entity.setModifiedTime(modifiedTime.longValue());
- if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin ||
- entity.getModifiedTime() > modifiedTimeEnd)) {
- return null;
- }
-
- // fetch is related to entities
- boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
- readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
- if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
- entity.getIsRelatedToEntities(), isRelatedTo)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) {
- entity.getIsRelatedToEntities().clear();
- }
- }
-
- // fetch relates to entities
- boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
- readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
- if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
- entity.getRelatesToEntities(), relatesTo)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.RELATES_TO)) {
- entity.getRelatesToEntities().clear();
- }
- }
-
- // fetch info
- boolean checkInfo = infoFilters != null && infoFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
- readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
- if (checkInfo &&
- !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.INFO)) {
- entity.getInfo().clear();
- }
- }
-
- // fetch configs
- boolean checkConfigs = configFilters != null && configFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
- readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
- if (checkConfigs && !TimelineStorageUtils.matchFilters(
- entity.getConfigs(), configFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.CONFIGS)) {
- entity.getConfigs().clear();
- }
- }
-
- // fetch events
- boolean checkEvents = eventFilters != null && eventFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
- readEvents(entity, result, false);
- if (checkEvents && !TimelineStorageUtils.matchEventFilters(
- entity.getEvents(), eventFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.EVENTS)) {
- entity.getEvents().clear();
- }
- }
-
- // fetch metrics
- boolean checkMetrics = metricFilters != null && metricFilters.size() > 0;
- if (fieldsToRetrieve.contains(Field.ALL) ||
- fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
- readMetrics(entity, result, EntityColumnPrefix.METRIC);
- if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
- entity.getMetrics(), metricFilters)) {
- return null;
- }
- if (!fieldsToRetrieve.contains(Field.ALL) &&
- !fieldsToRetrieve.contains(Field.METRICS)) {
- entity.getMetrics().clear();
- }
- }
- return entity;
- }
-
- /**
- * Helper method for reading relationship.
- */
- protected <T> void readRelationship(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isRelatedTo) throws IOException {
- // isRelatedTo and relatesTo are of type Map<String, Set<String>>
- Map<String, Object> columns = prefix.readResults(result);
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- for (String id : Separator.VALUES.splitEncoded(
- column.getValue().toString())) {
- if (isRelatedTo) {
- entity.addIsRelatedToEntity(column.getKey(), id);
- } else {
- entity.addRelatesToEntity(column.getKey(), id);
- }
- }
- }
- }
-
- /**
- * Helper method for reading key-value pairs for either info or config.
- */
- protected <T> void readKeyValuePairs(
- TimelineEntity entity, Result result, ColumnPrefix<T> prefix,
- boolean isConfig) throws IOException {
- // info and configuration are of type Map<String, Object or String>
- Map<String, Object> columns = prefix.readResults(result);
- if (isConfig) {
- for (Map.Entry<String, Object> column : columns.entrySet()) {
- entity.addConfig(column.getKey(), column.getValue().toString());
- }
- } else {
- entity.addInfo(columns);
- }
- }
-
- /**
- * Read events from the entity table or the application table. The column name
- * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted
- * if there is no info associated with the event.
- *
- * See {@link EntityTable} and {@link ApplicationTable} for a more detailed
- * schema description.
- */
- protected void readEvents(TimelineEntity entity, Result result,
- boolean isApplication) throws IOException {
- Map<String, TimelineEvent> eventsMap = new HashMap<>();
- Map<?, Object> eventsResult = isApplication ?
- ApplicationColumnPrefix.EVENT.
- readResultsHavingCompoundColumnQualifiers(result) :
- EntityColumnPrefix.EVENT.
- readResultsHavingCompoundColumnQualifiers(result);
- for (Map.Entry<?, Object> eventResult : eventsResult.entrySet()) {
- byte[][] karr = (byte[][])eventResult.getKey();
- // the column name is of the form "eventId=timestamp=infoKey"
- if (karr.length == 3) {
- String id = Bytes.toString(karr[0]);
- long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
- String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
- TimelineEvent event = eventsMap.get(key);
- if (event == null) {
- event = new TimelineEvent();
- event.setId(id);
- event.setTimestamp(ts);
- eventsMap.put(key, event);
- }
- // handle empty info
- String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]);
- if (infoKey != null) {
- event.addInfo(infoKey, eventResult.getValue());
- }
- } else {
- LOG.warn("incorrectly formatted column name: it will be discarded");
- continue;
- }
- }
- Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values());
- entity.addEvents(eventsSet);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
index 96c5a19..bc48cbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader;
+import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory;
public class HBaseTimelineReaderImpl
extends AbstractService implements TimelineReader {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
deleted file mode 100644
index a26c0c2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-
-/**
- * The base class for reading and deserializing timeline entities from the
- * HBase storage. Different types can be defined for different types of the
- * entities that are being requested.
- */
-abstract class TimelineEntityReader {
- private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
- protected static final long DEFAULT_BEGIN_TIME = 0L;
- protected static final long DEFAULT_END_TIME = Long.MAX_VALUE;
-
- protected final boolean singleEntityRead;
-
- protected String userId;
- protected String clusterId;
- protected String flowName;
- protected Long flowRunId;
- protected String appId;
- protected String entityType;
- protected EnumSet<Field> fieldsToRetrieve;
- // used only for a single entity read mode
- protected String entityId;
- // used only for multiple entity read mode
- protected Long limit;
- protected Long createdTimeBegin;
- protected Long createdTimeEnd;
- protected Long modifiedTimeBegin;
- protected Long modifiedTimeEnd;
- protected Map<String, Set<String>> relatesTo;
- protected Map<String, Set<String>> isRelatedTo;
- protected Map<String, Object> infoFilters;
- protected Map<String, String> configFilters;
- protected Set<String> metricFilters;
- protected Set<String> eventFilters;
- protected TimelineFilterList confsToRetrieve;
- protected TimelineFilterList metricsToRetrieve;
-
- /**
- * Main table the entity reader uses.
- */
- protected BaseTable<?> table;
-
- /**
- * Specifies whether keys for this table are sorted in a manner where entities
- * can be retrieved by created time. If true, it will be sufficient to collect
- * the first results as specified by the limit. Otherwise all matched entities
- * will be fetched and then limit applied.
- */
- private boolean sortedKeys = false;
-
- /**
- * Instantiates a reader for multiple-entity reads.
- */
- protected TimelineEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- Long limit, Long createdTimeBegin, Long createdTimeEnd,
- Long modifiedTimeBegin, Long modifiedTimeEnd,
- Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> metricFilters, Set<String> eventFilters,
- TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve,
- EnumSet<Field> fieldsToRetrieve, boolean sortedKeys) {
- this.singleEntityRead = false;
- this.sortedKeys = sortedKeys;
- this.userId = userId;
- this.clusterId = clusterId;
- this.flowName = flowName;
- this.flowRunId = flowRunId;
- this.appId = appId;
- this.entityType = entityType;
- this.fieldsToRetrieve = fieldsToRetrieve;
- this.limit = limit;
- this.createdTimeBegin = createdTimeBegin;
- this.createdTimeEnd = createdTimeEnd;
- this.modifiedTimeBegin = modifiedTimeBegin;
- this.modifiedTimeEnd = modifiedTimeEnd;
- this.relatesTo = relatesTo;
- this.isRelatedTo = isRelatedTo;
- this.infoFilters = infoFilters;
- this.configFilters = configFilters;
- this.metricFilters = metricFilters;
- this.eventFilters = eventFilters;
- this.confsToRetrieve = confsToRetrieve;
- this.metricsToRetrieve = metricsToRetrieve;
-
- this.table = getTable();
- }
-
- /**
- * Instantiates a reader for single-entity reads.
- */
- protected TimelineEntityReader(String userId, String clusterId,
- String flowName, Long flowRunId, String appId, String entityType,
- String entityId, TimelineFilterList confsToRetrieve,
- TimelineFilterList metricsToRetrieve, EnumSet<Field> fieldsToRetrieve) {
- this.singleEntityRead = true;
- this.userId = userId;
- this.clusterId = clusterId;
- this.flowName = flowName;
- this.flowRunId = flowRunId;
- this.appId = appId;
- this.entityType = entityType;
- this.fieldsToRetrieve = fieldsToRetrieve;
- this.entityId = entityId;
- this.confsToRetrieve = confsToRetrieve;
- this.metricsToRetrieve = metricsToRetrieve;
-
- this.table = getTable();
- }
-
- /**
- * Creates a {@link FilterList} based on fields, confs and metrics to
- * retrieve. This filter list will be set in Scan/Get objects to trim down
- * results fetched from HBase back-end storage.
- * @return a {@link FilterList} object.
- */
- protected abstract FilterList constructFilterListBasedOnFields();
-
- /**
- * Reads and deserializes a single timeline entity from the HBase storage.
- */
- public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
- throws IOException {
- validateParams();
- augmentParams(hbaseConf, conn);
-
- FilterList filterList = constructFilterListBasedOnFields();
- Result result = getResult(hbaseConf, conn, filterList);
- if (result == null || result.isEmpty()) {
- // Could not find a matching row.
- LOG.info("Cannot find matching entity of type " + entityType);
- return null;
- }
- return parseEntity(result);
- }
-
- /**
- * Reads and deserializes a set of timeline entities from the HBase storage.
- * It goes through all the results available, and returns the number of
- * entries as specified in the limit in the entity's natural sort order.
- */
- public Set<TimelineEntity> readEntities(Configuration hbaseConf,
- Connection conn) throws IOException {
- validateParams();
- augmentParams(hbaseConf, conn);
-
- NavigableSet<TimelineEntity> entities = new TreeSet<>();
- FilterList filterList = constructFilterListBasedOnFields();
- ResultScanner results = getResults(hbaseConf, conn, filterList);
- try {
- for (Result result : results) {
- TimelineEntity entity = parseEntity(result);
- if (entity == null) {
- continue;
- }
- entities.add(entity);
- if (!sortedKeys) {
- if (entities.size() > limit) {
- entities.pollLast();
- }
- } else {
- if (entities.size() == limit) {
- break;
- }
- }
- }
- return entities;
- } finally {
- results.close();
- }
- }
-
- /**
- * Returns the main table to be used by the entity reader.
- */
- protected abstract BaseTable<?> getTable();
-
- /**
- * Validates the required parameters to read the entities.
- */
- protected abstract void validateParams();
-
- /**
- * Sets certain parameters to defaults if the values are not provided.
- */
- protected abstract void augmentParams(Configuration hbaseConf,
- Connection conn) throws IOException;
-
- /**
- * Fetches a {@link Result} instance for a single-entity read.
- *
- * @return the {@link Result} instance or null if no such record is found.
- */
- protected abstract Result getResult(Configuration hbaseConf, Connection conn,
- FilterList filterList) throws IOException;
-
- /**
- * Fetches a {@link ResultScanner} for a multi-entity read.
- */
- protected abstract ResultScanner getResults(Configuration hbaseConf,
- Connection conn, FilterList filterList) throws IOException;
-
- /**
- * Given a {@link Result} instance, deserializes and creates a
- * {@link TimelineEntity}.
- *
- * @return the {@link TimelineEntity} instance, or null if the {@link Result}
- * is null or empty.
- */
- protected abstract TimelineEntity parseEntity(Result result)
- throws IOException;
-
- /**
- * Helper method for reading and deserializing {@link TimelineMetric} objects
- * using the specified column prefix. The timeline metrics then are added to
- * the given timeline entity.
- */
- protected void readMetrics(TimelineEntity entity, Result result,
- ColumnPrefix<?> columnPrefix) throws IOException {
- NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
- columnPrefix.readResultsWithTimestamps(result);
- for (Map.Entry<String, NavigableMap<Long, Number>> metricResult:
- metricsResult.entrySet()) {
- TimelineMetric metric = new TimelineMetric();
- metric.setId(metricResult.getKey());
- // Simply assume that if the value set contains more than 1 elements, the
- // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric
- metric.setType(metricResult.getValue().size() > 1 ?
- TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE);
- metric.addValues(metricResult.getValue());
- entity.addMetric(metric);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/63d90992/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
deleted file mode 100644
index 36ed4ca..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
-
-/**
- * Factory methods for instantiating a timeline entity reader.
- */
-class TimelineEntityReaderFactory {
- /**
- * Creates a timeline entity reader instance for reading a single entity with
- * the specified input.
- */
- public static TimelineEntityReader createSingleEntityReader(String userId,
- String clusterId, String flowName, Long flowRunId, String appId,
- String entityType, String entityId, TimelineFilterList confs,
- TimelineFilterList metrics, EnumSet<Field> fieldsToRetrieve) {
- // currently the types that are handled separate from the generic entity
- // table are application, flow run, and flow activity entities
- if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
- return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
- } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
- return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
- } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
- return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, entityId, fieldsToRetrieve);
- } else {
- // assume we're dealing with a generic entity read
- return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, entityId, confs, metrics, fieldsToRetrieve);
- }
- }
-
- /**
- * Creates a timeline entity reader instance for reading set of entities with
- * the specified input and predicates.
- */
- public static TimelineEntityReader createMultipleEntitiesReader(String userId,
- String clusterId, String flowName, Long flowRunId, String appId,
- String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd,
- Long modifiedTimeBegin, Long modifiedTimeEnd,
- Map<String, Set<String>> relatesTo, Map<String, Set<String>> isRelatedTo,
- Map<String, Object> infoFilters, Map<String, String> configFilters,
- Set<String> metricFilters, Set<String> eventFilters,
- TimelineFilterList confs, TimelineFilterList metrics,
- EnumSet<Field> fieldsToRetrieve) {
- // currently the types that are handled separate from the generic entity
- // table are application, flow run, and flow activity entities
- if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) {
- return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, limit, createdTimeBegin, createdTimeEnd,
- modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters, confs,
- metrics, fieldsToRetrieve);
- } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) {
- return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, limit, createdTimeBegin, createdTimeEnd,
- modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters,
- fieldsToRetrieve);
- } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) {
- return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, limit, createdTimeBegin, createdTimeEnd,
- modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters, confs,
- metrics, fieldsToRetrieve);
- } else {
- // assume we're dealing with a generic entity read
- return new GenericEntityReader(userId, clusterId, flowName, flowRunId,
- appId, entityType, limit, createdTimeBegin, createdTimeEnd,
- modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo,
- infoFilters, configFilters, metricFilters, eventFilters, confs,
- metrics, fieldsToRetrieve, false);
- }
- }
-}