You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2018/02/23 19:44:50 UTC
[09/50] [abbrv] hadoop git commit: YARN-7919. Refactor
timelineservice-hbase module into submodules. Contributed by Haibo Chen.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
deleted file mode 100644
index f938185..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ /dev/null
@@ -1,593 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn;
-import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey;
-import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This implements a hbase based backend for storing the timeline entity
- * information.
- * It writes to multiple tables at the backend
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class HBaseTimelineWriterImpl extends AbstractService implements
- TimelineWriter {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(HBaseTimelineWriterImpl.class);
-
- private Connection conn;
- private TypedBufferedMutator<EntityTable> entityTable;
- private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
- private TypedBufferedMutator<ApplicationTable> applicationTable;
- private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
- private TypedBufferedMutator<FlowRunTable> flowRunTable;
- private TypedBufferedMutator<SubApplicationTable> subApplicationTable;
-
- /**
- * Used to convert strings key components to and from storage format.
- */
- private final KeyConverter<String> stringKeyConverter =
- new StringKeyConverter();
-
- /**
- * Used to convert Long key components to and from storage format.
- */
- private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
-
- private enum Tables {
- APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE
- };
-
- public HBaseTimelineWriterImpl() {
- super(HBaseTimelineWriterImpl.class.getName());
- }
-
- /**
- * initializes the hbase connection to write to the entity table.
- */
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- Configuration hbaseConf =
- HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
- conn = ConnectionFactory.createConnection(hbaseConf);
- entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
- appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
- applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
- flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
- flowActivityTable =
- new FlowActivityTable().getTableMutator(hbaseConf, conn);
- subApplicationTable =
- new SubApplicationTable().getTableMutator(hbaseConf, conn);
-
- UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
- UserGroupInformation.getLoginUser() :
- UserGroupInformation.getCurrentUser();
- LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi);
- }
-
- /**
- * Stores the entire information in TimelineEntities to the timeline store.
- */
- @Override
- public TimelineWriteResponse write(TimelineCollectorContext context,
- TimelineEntities data, UserGroupInformation callerUgi)
- throws IOException {
-
- TimelineWriteResponse putStatus = new TimelineWriteResponse();
-
- String clusterId = context.getClusterId();
- String userId = context.getUserId();
- String flowName = context.getFlowName();
- String flowVersion = context.getFlowVersion();
- long flowRunId = context.getFlowRunId();
- String appId = context.getAppId();
- String subApplicationUser = callerUgi.getShortUserName();
-
- // defensive coding to avoid NPE during row key construction
- if ((flowName == null) || (appId == null) || (clusterId == null)
- || (userId == null)) {
- LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId
- + " userId=" + userId + " clusterId=" + clusterId
- + " . Not proceeding with writing to hbase");
- return putStatus;
- }
-
- for (TimelineEntity te : data.getEntities()) {
-
- // a set can have at most 1 null
- if (te == null) {
- continue;
- }
-
- // if the entity is the application, the destination is the application
- // table
- boolean isApplication = ApplicationEntity.isApplicationEntity(te);
- byte[] rowKey;
- if (isApplication) {
- ApplicationRowKey applicationRowKey =
- new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
- appId);
- rowKey = applicationRowKey.getRowKey();
- store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE);
- } else {
- EntityRowKey entityRowKey =
- new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
- te.getType(), te.getIdPrefix(), te.getId());
- rowKey = entityRowKey.getRowKey();
- store(rowKey, te, flowVersion, Tables.ENTITY_TABLE);
- }
-
- if (!isApplication && !userId.equals(subApplicationUser)) {
- SubApplicationRowKey subApplicationRowKey =
- new SubApplicationRowKey(subApplicationUser, clusterId,
- te.getType(), te.getIdPrefix(), te.getId(), userId);
- rowKey = subApplicationRowKey.getRowKey();
- store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE);
- }
-
- if (isApplication) {
- TimelineEvent event =
- ApplicationEntity.getApplicationEvent(te,
- ApplicationMetricsConstants.CREATED_EVENT_TYPE);
- FlowRunRowKey flowRunRowKey =
- new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
- if (event != null) {
- onApplicationCreated(flowRunRowKey, clusterId, appId, userId,
- flowVersion, te, event.getTimestamp());
- }
- // if it's an application entity, store metrics
- storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
- // if application has finished, store it's finish time and write final
- // values of all metrics
- event = ApplicationEntity.getApplicationEvent(te,
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
- if (event != null) {
- onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
- event.getTimestamp());
- }
- }
- }
- return putStatus;
- }
-
- private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
- String clusterId, String appId, String userId, String flowVersion,
- TimelineEntity te, long appCreatedTimeStamp)
- throws IOException {
-
- String flowName = flowRunRowKey.getFlowName();
- Long flowRunId = flowRunRowKey.getFlowRunId();
-
- // store in App to flow table
- AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId);
- byte[] rowKey = appToFlowRowKey.getRowKey();
- AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId,
- null, flowName);
- AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId,
- null, flowRunId);
- AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null,
- userId);
-
- // store in flow run table
- storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
-
- // store in flow activity table
- byte[] flowActivityRowKeyBytes =
- new FlowActivityRowKey(flowRunRowKey.getClusterId(),
- appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
- .getRowKey();
- byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
- FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes,
- flowActivityTable, qualifier, null, flowVersion,
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
- }
-
- /*
- * updates the {@link FlowRunTable} with Application Created information
- */
- private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
- String appId, TimelineEntity te) throws IOException {
- byte[] rowKey = flowRunRowKey.getRowKey();
- FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
- te.getCreatedTime(),
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
- }
-
-
- /*
- * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
- * application has finished
- */
- private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
- String flowVersion, String appId, TimelineEntity te,
- long appFinishedTimeStamp) throws IOException {
- // store in flow run table
- storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
- appFinishedTimeStamp);
-
- // indicate in the flow activity table that the app has finished
- byte[] rowKey =
- new FlowActivityRowKey(flowRunRowKey.getClusterId(),
- appFinishedTimeStamp, flowRunRowKey.getUserId(),
- flowRunRowKey.getFlowName()).getRowKey();
- byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
- FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
- null, flowVersion,
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
- }
-
- /*
- * Update the {@link FlowRunTable} with Application Finished information
- */
- private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
- String appId, TimelineEntity te, long appFinishedTimeStamp)
- throws IOException {
- byte[] rowKey = flowRunRowKey.getRowKey();
- Attribute attributeAppId =
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
- FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
- appFinishedTimeStamp, attributeAppId);
-
- // store the final value of metrics since application has finished
- Set<TimelineMetric> metrics = te.getMetrics();
- if (metrics != null) {
- storeFlowMetrics(rowKey, metrics, attributeAppId,
- AggregationOperation.SUM_FINAL.getAttribute());
- }
- }
-
- /*
- * Updates the {@link FlowRunTable} with Application Metrics
- */
- private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
- String appId, TimelineEntity te) throws IOException {
- Set<TimelineMetric> metrics = te.getMetrics();
- if (metrics != null) {
- byte[] rowKey = flowRunRowKey.getRowKey();
- storeFlowMetrics(rowKey, metrics,
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
- AggregationOperation.SUM.getAttribute());
- }
- }
-
- private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
- Attribute... attributes) throws IOException {
- for (TimelineMetric metric : metrics) {
- byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
- Map<Long, Number> timeseries = metric.getValues();
- for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
- Long timestamp = timeseriesEntry.getKey();
- FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
- metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
- attributes);
- }
- }
- }
-
- /**
- * Stores the Relations from the {@linkplain TimelineEntity} object.
- */
- private <T> void storeRelations(byte[] rowKey,
- Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix,
- TypedBufferedMutator<T> table) throws IOException {
- if (connectedEntities != null) {
- for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities
- .entrySet()) {
- // id3?id4?id5
- String compoundValue =
- Separator.VALUES.joinEncoded(connectedEntity.getValue());
- columnPrefix.store(rowKey, table,
- stringKeyConverter.encode(connectedEntity.getKey()), null,
- compoundValue);
- }
- }
- }
-
- /**
- * Stores information from the {@linkplain TimelineEntity} object.
- */
- private void store(byte[] rowKey, TimelineEntity te,
- String flowVersion,
- Tables table) throws IOException {
- switch (table) {
- case APPLICATION_TABLE:
- ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId());
- ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null,
- te.getCreatedTime());
- ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null,
- flowVersion);
- storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO,
- applicationTable);
- storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC,
- applicationTable);
- storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT,
- applicationTable);
- storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG,
- applicationTable);
- storeRelations(rowKey, te.getIsRelatedToEntities(),
- ApplicationColumnPrefix.IS_RELATED_TO, applicationTable);
- storeRelations(rowKey, te.getRelatesToEntities(),
- ApplicationColumnPrefix.RELATES_TO, applicationTable);
- break;
- case ENTITY_TABLE:
- EntityColumn.ID.store(rowKey, entityTable, null, te.getId());
- EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType());
- EntityColumn.CREATED_TIME.store(rowKey, entityTable, null,
- te.getCreatedTime());
- EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion);
- storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO,
- entityTable);
- storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC,
- entityTable);
- storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT,
- entityTable);
- storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG,
- entityTable);
- storeRelations(rowKey, te.getIsRelatedToEntities(),
- EntityColumnPrefix.IS_RELATED_TO, entityTable);
- storeRelations(rowKey, te.getRelatesToEntities(),
- EntityColumnPrefix.RELATES_TO, entityTable);
- break;
- case SUBAPPLICATION_TABLE:
- SubApplicationColumn.ID.store(rowKey, subApplicationTable, null,
- te.getId());
- SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null,
- te.getType());
- SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null,
- te.getCreatedTime());
- SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null,
- flowVersion);
- storeInfo(rowKey, te.getInfo(), flowVersion,
- SubApplicationColumnPrefix.INFO, subApplicationTable);
- storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC,
- subApplicationTable);
- storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT,
- subApplicationTable);
- storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG,
- subApplicationTable);
- storeRelations(rowKey, te.getIsRelatedToEntities(),
- SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable);
- storeRelations(rowKey, te.getRelatesToEntities(),
- SubApplicationColumnPrefix.RELATES_TO, subApplicationTable);
- break;
- default:
- LOG.info("Invalid table name provided.");
- break;
- }
- }
-
- /**
- * stores the info information from {@linkplain TimelineEntity}.
- */
- private <T> void storeInfo(byte[] rowKey, Map<String, Object> info,
- String flowVersion, ColumnPrefix<T> columnPrefix,
- TypedBufferedMutator<T> table) throws IOException {
- if (info != null) {
- for (Map.Entry<String, Object> entry : info.entrySet()) {
- columnPrefix.store(rowKey, table,
- stringKeyConverter.encode(entry.getKey()), null, entry.getValue());
- }
- }
- }
-
- /**
- * stores the config information from {@linkplain TimelineEntity}.
- */
- private <T> void storeConfig(byte[] rowKey, Map<String, String> config,
- ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
- throws IOException {
- if (config != null) {
- for (Map.Entry<String, String> entry : config.entrySet()) {
- byte[] configKey = stringKeyConverter.encode(entry.getKey());
- columnPrefix.store(rowKey, table, configKey, null, entry.getValue());
- }
- }
- }
-
- /**
- * stores the {@linkplain TimelineMetric} information from the
- * {@linkplain TimelineEvent} object.
- */
- private <T> void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
- ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
- throws IOException {
- if (metrics != null) {
- for (TimelineMetric metric : metrics) {
- byte[] metricColumnQualifier =
- stringKeyConverter.encode(metric.getId());
- Map<Long, Number> timeseries = metric.getValues();
- for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
- Long timestamp = timeseriesEntry.getKey();
- columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp,
- timeseriesEntry.getValue());
- }
- }
- }
- }
-
- /**
- * Stores the events from the {@linkplain TimelineEvent} object.
- */
- private <T> void storeEvents(byte[] rowKey, Set<TimelineEvent> events,
- ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table)
- throws IOException {
- if (events != null) {
- for (TimelineEvent event : events) {
- if (event != null) {
- String eventId = event.getId();
- if (eventId != null) {
- long eventTimestamp = event.getTimestamp();
- // if the timestamp is not set, use the current timestamp
- if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) {
- LOG.warn("timestamp is not set for event " + eventId +
- "! Using the current timestamp");
- eventTimestamp = System.currentTimeMillis();
- }
- Map<String, Object> eventInfo = event.getInfo();
- if ((eventInfo == null) || (eventInfo.size() == 0)) {
- byte[] columnQualifierBytes =
- new EventColumnName(eventId, eventTimestamp, null)
- .getColumnQualifier();
- columnPrefix.store(rowKey, table, columnQualifierBytes, null,
- Separator.EMPTY_BYTES);
- } else {
- for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
- // eventId=infoKey
- byte[] columnQualifierBytes =
- new EventColumnName(eventId, eventTimestamp, info.getKey())
- .getColumnQualifier();
- columnPrefix.store(rowKey, table, columnQualifierBytes, null,
- info.getValue());
- } // for info: eventInfo
- }
- }
- }
- } // event : events
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage
- * .TimelineWriter#aggregate
- * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
- * org.apache
- * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
- */
- @Override
- public TimelineWriteResponse aggregate(TimelineEntity data,
- TimelineAggregationTrack track) throws IOException {
- return null;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
- * ()
- */
- @Override
- public void flush() throws IOException {
- // flush all buffered mutators
- entityTable.flush();
- appToFlowTable.flush();
- applicationTable.flush();
- flowRunTable.flush();
- flowActivityTable.flush();
- subApplicationTable.flush();
- }
-
- /**
- * close the hbase connections The close APIs perform flushing and release any
- * resources held.
- */
- @Override
- protected void serviceStop() throws Exception {
- if (entityTable != null) {
- LOG.info("closing the entity table");
- // The close API performs flushing and releases any resources held
- entityTable.close();
- }
- if (appToFlowTable != null) {
- LOG.info("closing the app_flow table");
- // The close API performs flushing and releases any resources held
- appToFlowTable.close();
- }
- if (applicationTable != null) {
- LOG.info("closing the application table");
- applicationTable.close();
- }
- if (flowRunTable != null) {
- LOG.info("closing the flow run table");
- // The close API performs flushing and releases any resources held
- flowRunTable.close();
- }
- if (flowActivityTable != null) {
- LOG.info("closing the flowActivityTable table");
- // The close API performs flushing and releases any resources held
- flowActivityTable.close();
- }
- if (subApplicationTable != null) {
- subApplicationTable.close();
- }
- if (conn != null) {
- LOG.info("closing the hbase Connection");
- conn.close();
- }
- super.serviceStop();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
deleted file mode 100644
index c9f7cec..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This creates the schema for a hbase based backend for storing application
- * timeline information.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public final class TimelineSchemaCreator {
- private TimelineSchemaCreator() {
- }
-
- final static String NAME = TimelineSchemaCreator.class.getSimpleName();
- private static final Logger LOG =
- LoggerFactory.getLogger(TimelineSchemaCreator.class);
- private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
- private static final String APP_METRICS_TTL_OPTION_SHORT = "ma";
- private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa";
- private static final String APP_TABLE_NAME_SHORT = "a";
- private static final String SUB_APP_TABLE_NAME_SHORT = "sa";
- private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
- private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me";
- private static final String ENTITY_TABLE_NAME_SHORT = "e";
- private static final String HELP_SHORT = "h";
- private static final String CREATE_TABLES_SHORT = "c";
-
- public static void main(String[] args) throws Exception {
-
- LOG.info("Starting the schema creation");
- Configuration hbaseConf =
- HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(
- new YarnConfiguration());
- // Grab input args and allow for -Dxyz style arguments
- String[] otherArgs = new GenericOptionsParser(hbaseConf, args)
- .getRemainingArgs();
-
- // Grab the arguments we're looking for.
- CommandLine commandLine = parseArgs(otherArgs);
-
- if (commandLine.hasOption(HELP_SHORT)) {
- // -help option has the highest precedence
- printUsage();
- } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) {
- // Grab the entityTableName argument
- String entityTableName = commandLine.getOptionValue(
- ENTITY_TABLE_NAME_SHORT);
- if (StringUtils.isNotBlank(entityTableName)) {
- hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName);
- }
- // Grab the entity metrics TTL
- String entityTableMetricsTTL = commandLine.getOptionValue(
- ENTITY_METRICS_TTL_OPTION_SHORT);
- if (StringUtils.isNotBlank(entityTableMetricsTTL)) {
- int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL);
- new EntityTable().setMetricsTTL(entityMetricsTTL, hbaseConf);
- }
- // Grab the appToflowTableName argument
- String appToflowTableName = commandLine.getOptionValue(
- APP_TO_FLOW_TABLE_NAME_SHORT);
- if (StringUtils.isNotBlank(appToflowTableName)) {
- hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName);
- }
- // Grab the applicationTableName argument
- String applicationTableName = commandLine.getOptionValue(
- APP_TABLE_NAME_SHORT);
- if (StringUtils.isNotBlank(applicationTableName)) {
- hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME,
- applicationTableName);
- }
- // Grab the application metrics TTL
- String applicationTableMetricsTTL = commandLine.getOptionValue(
- APP_METRICS_TTL_OPTION_SHORT);
- if (StringUtils.isNotBlank(applicationTableMetricsTTL)) {
- int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL);
- new ApplicationTable().setMetricsTTL(appMetricsTTL, hbaseConf);
- }
-
- // Grab the subApplicationTableName argument
- String subApplicationTableName = commandLine.getOptionValue(
- SUB_APP_TABLE_NAME_SHORT);
- if (StringUtils.isNotBlank(subApplicationTableName)) {
- hbaseConf.set(SubApplicationTable.TABLE_NAME_CONF_NAME,
- subApplicationTableName);
- }
- // Grab the subApplication metrics TTL
- String subApplicationTableMetricsTTL = commandLine
- .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT);
- if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) {
- int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL);
- new SubApplicationTable().setMetricsTTL(subAppMetricsTTL, hbaseConf);
- }
-
- // create all table schemas in hbase
- final boolean skipExisting = commandLine.hasOption(
- SKIP_EXISTING_TABLE_OPTION_SHORT);
- createAllSchemas(hbaseConf, skipExisting);
- } else {
- // print usage information if -create is not specified
- printUsage();
- }
- }
-
- /**
- * Parse command-line arguments.
- *
- * @param args
- * command line arguments passed to program.
- * @return parsed command line.
- * @throws ParseException
- */
- private static CommandLine parseArgs(String[] args) throws ParseException {
- Options options = new Options();
-
- // Input
- Option o = new Option(HELP_SHORT, "help", false, "print help information");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(CREATE_TABLES_SHORT, "create", false,
- "a mandatory option to create hbase tables");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true,
- "entity table name");
- o.setArgName("entityTableName");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true,
- "TTL for metrics column family");
- o.setArgName("entityMetricsTTL");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true,
- "app to flow table name");
- o.setArgName("appToflowTableName");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true,
- "application table name");
- o.setArgName("applicationTableName");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true,
- "TTL for metrics column family");
- o.setArgName("applicationMetricsTTL");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true,
- "subApplication table name");
- o.setArgName("subApplicationTableName");
- o.setRequired(false);
- options.addOption(o);
-
- o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL",
- true, "TTL for metrics column family");
- o.setArgName("subApplicationMetricsTTL");
- o.setRequired(false);
- options.addOption(o);
-
- // Options without an argument
- // No need to set arg name since we do not need an argument here
- o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
- false, "skip existing Hbase tables and continue to create new tables");
- o.setRequired(false);
- options.addOption(o);
-
- CommandLineParser parser = new PosixParser();
- CommandLine commandLine = null;
- try {
- commandLine = parser.parse(options, args);
- } catch (Exception e) {
- LOG.error("ERROR: " + e.getMessage() + "\n");
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(NAME + " ", options, true);
- System.exit(-1);
- }
-
- return commandLine;
- }
-
- private static void printUsage() {
- StringBuilder usage = new StringBuilder("Command Usage: \n");
- usage.append("TimelineSchemaCreator [-help] Display help info" +
- " for all commands. Or\n");
- usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" +
- " Create hbase tables.\n\n");
- usage.append("The Optional options for creating tables include: \n");
- usage.append("[-entityTableName <Entity Table Name>] " +
- "The name of the Entity table\n");
- usage.append("[-entityMetricsTTL <Entity Table Metrics TTL>]" +
- " TTL for metrics in the Entity table\n");
- usage.append("[-appToflowTableName <AppToflow Table Name>]" +
- " The name of the AppToFlow table\n");
- usage.append("[-applicationTableName <Application Table Name>]" +
- " The name of the Application table\n");
- usage.append("[-applicationMetricsTTL <Application Table Metrics TTL>]" +
- " TTL for metrics in the Application table\n");
- usage.append("[-subApplicationTableName <SubApplication Table Name>]" +
- " The name of the SubApplication table\n");
- usage.append("[-subApplicationMetricsTTL " +
- " <SubApplication Table Metrics TTL>]" +
- " TTL for metrics in the SubApplication table\n");
- usage.append("[-skipExistingTable] Whether to skip existing" +
- " hbase tables\n");
- System.out.println(usage.toString());
- }
-
- /**
- * Create all table schemas and log success or exception if failed.
- * @param hbaseConf the hbase configuration to create tables with
- * @param skipExisting whether to skip existing hbase tables
- */
- private static void createAllSchemas(Configuration hbaseConf,
- boolean skipExisting) {
- List<Exception> exceptions = new ArrayList<>();
- try {
- if (skipExisting) {
- LOG.info("Will skip existing tables and continue on htable creation "
- + "exceptions!");
- }
- createAllTables(hbaseConf, skipExisting);
- LOG.info("Successfully created HBase schema. ");
- } catch (IOException e) {
- LOG.error("Error in creating hbase tables: ", e);
- exceptions.add(e);
- }
-
- if (exceptions.size() > 0) {
- LOG.warn("Schema creation finished with the following exceptions");
- for (Exception e : exceptions) {
- LOG.warn(e.getMessage());
- }
- System.exit(-1);
- } else {
- LOG.info("Schema creation finished successfully");
- }
- }
-
- @VisibleForTesting
- public static void createAllTables(Configuration hbaseConf,
- boolean skipExisting) throws IOException {
-
- Connection conn = null;
- try {
- conn = ConnectionFactory.createConnection(hbaseConf);
- Admin admin = conn.getAdmin();
- if (admin == null) {
- throw new IOException("Cannot create table since admin is null");
- }
- try {
- new EntityTable().createTable(admin, hbaseConf);
- } catch (IOException e) {
- if (skipExisting) {
- LOG.warn("Skip and continue on: " + e.getMessage());
- } else {
- throw e;
- }
- }
- try {
- new AppToFlowTable().createTable(admin, hbaseConf);
- } catch (IOException e) {
- if (skipExisting) {
- LOG.warn("Skip and continue on: " + e.getMessage());
- } else {
- throw e;
- }
- }
- try {
- new ApplicationTable().createTable(admin, hbaseConf);
- } catch (IOException e) {
- if (skipExisting) {
- LOG.warn("Skip and continue on: " + e.getMessage());
- } else {
- throw e;
- }
- }
- try {
- new FlowRunTable().createTable(admin, hbaseConf);
- } catch (IOException e) {
- if (skipExisting) {
- LOG.warn("Skip and continue on: " + e.getMessage());
- } else {
- throw e;
- }
- }
- try {
- new FlowActivityTable().createTable(admin, hbaseConf);
- } catch (IOException e) {
- if (skipExisting) {
- LOG.warn("Skip and continue on: " + e.getMessage());
- } else {
- throw e;
- }
- }
- try {
- new SubApplicationTable().createTable(admin, hbaseConf);
- } catch (IOException e) {
- if (skipExisting) {
- LOG.warn("Skip and continue on: " + e.getMessage());
- } else {
- throw e;
- }
- }
- } finally {
- if (conn != null) {
- conn.close();
- }
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
deleted file mode 100644
index 00eaa7e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-
-/**
- * Identifies fully qualified columns for the {@link ApplicationTable}.
- */
-public enum ApplicationColumn implements Column<ApplicationTable> {
-
- /**
- * App id.
- */
- ID(ApplicationColumnFamily.INFO, "id"),
-
- /**
- * When the application was created.
- */
- CREATED_TIME(ApplicationColumnFamily.INFO, "created_time",
- new LongConverter()),
-
- /**
- * The version of the flow that this app belongs to.
- */
- FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version");
-
- private final ColumnHelper<ApplicationTable> column;
- private final ColumnFamily<ApplicationTable> columnFamily;
- private final String columnQualifier;
- private final byte[] columnQualifierBytes;
-
- private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
- String columnQualifier) {
- this(columnFamily, columnQualifier, GenericConverter.getInstance());
- }
-
- private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily,
- String columnQualifier, ValueConverter converter) {
- this.columnFamily = columnFamily;
- this.columnQualifier = columnQualifier;
- // Future-proof by ensuring the right column prefix hygiene.
- this.columnQualifierBytes =
- Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
- this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
- }
-
- /**
- * @return the column name value
- */
- private String getColumnQualifier() {
- return columnQualifier;
- }
-
- public void store(byte[] rowKey,
- TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
- Object inputValue, Attribute... attributes) throws IOException {
- column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue, attributes);
- }
-
- public Object readResult(Result result) throws IOException {
- return column.readResult(result, columnQualifierBytes);
- }
-
- @Override
- public byte[] getColumnQualifierBytes() {
- return columnQualifierBytes.clone();
- }
-
- @Override
- public byte[] getColumnFamilyBytes() {
- return columnFamily.getBytes();
- }
-
- @Override
- public ValueConverter getValueConverter() {
- return column.getValueConverter();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
deleted file mode 100644
index 97e5f7b..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-
-/**
- * Represents the application table column families.
- */
-public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> {
-
- /**
- * Info column family houses known columns, specifically ones included in
- * columnfamily filters.
- */
- INFO("i"),
-
- /**
- * Configurations are in a separate column family for two reasons: a) the size
- * of the config values can be very large and b) we expect that config values
- * are often separately accessed from other metrics and info columns.
- */
- CONFIGS("c"),
-
- /**
- * Metrics have a separate column family, because they have a separate TTL.
- */
- METRICS("m");
-
- /**
- * Byte representation of this column family.
- */
- private final byte[] bytes;
-
- /**
- * @param value create a column family with this name. Must be lower case and
- * without spaces.
- */
- private ApplicationColumnFamily(String value) {
- // column families should be lower case and not contain any spaces.
- this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
- }
-
- public byte[] getBytes() {
- return Bytes.copy(bytes);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
deleted file mode 100644
index 8297dc5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-
-/**
- * Identifies partially qualified columns for the application table.
- */
-public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
-
- /**
- * To store TimelineEntity getIsRelatedToEntities values.
- */
- IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"),
-
- /**
- * To store TimelineEntity getRelatesToEntities values.
- */
- RELATES_TO(ApplicationColumnFamily.INFO, "r"),
-
- /**
- * To store TimelineEntity info values.
- */
- INFO(ApplicationColumnFamily.INFO, "i"),
-
- /**
- * Lifecycle events for an application.
- */
- EVENT(ApplicationColumnFamily.INFO, "e"),
-
- /**
- * Config column stores configuration with config key as the column name.
- */
- CONFIG(ApplicationColumnFamily.CONFIGS, null),
-
- /**
- * Metrics are stored with the metric name as the column name.
- */
- METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter());
-
- private final ColumnHelper<ApplicationTable> column;
- private final ColumnFamily<ApplicationTable> columnFamily;
-
- /**
- * Can be null for those cases where the provided column qualifier is the
- * entire column name.
- */
- private final String columnPrefix;
- private final byte[] columnPrefixBytes;
-
- /**
- * Private constructor, meant to be used by the enum definition.
- *
- * @param columnFamily that this column is stored in.
- * @param columnPrefix for this column.
- */
- private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
- String columnPrefix) {
- this(columnFamily, columnPrefix, GenericConverter.getInstance());
- }
-
- /**
- * Private constructor, meant to be used by the enum definition.
- *
- * @param columnFamily that this column is stored in.
- * @param columnPrefix for this column.
- * @param converter used to encode/decode values to be stored in HBase for
- * this column prefix.
- */
- private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily,
- String columnPrefix, ValueConverter converter) {
- column = new ColumnHelper<ApplicationTable>(columnFamily, converter);
- this.columnFamily = columnFamily;
- this.columnPrefix = columnPrefix;
- if (columnPrefix == null) {
- this.columnPrefixBytes = null;
- } else {
- // Future-proof by ensuring the right column prefix hygiene.
- this.columnPrefixBytes =
- Bytes.toBytes(Separator.SPACE.encode(columnPrefix));
- }
- }
-
- /**
- * @return the column name value
- */
- private String getColumnPrefix() {
- return columnPrefix;
- }
-
- @Override
- public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) {
- return ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifierPrefix);
- }
-
- @Override
- public byte[] getColumnPrefixBytes(String qualifierPrefix) {
- return ColumnHelper.getColumnQualifier(
- this.columnPrefixBytes, qualifierPrefix);
- }
-
- @Override
- public byte[] getColumnFamilyBytes() {
- return columnFamily.getBytes();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #store(byte[],
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.
- * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
- */
- public void store(byte[] rowKey,
- TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
- Long timestamp, Object inputValue, Attribute... attributes)
- throws IOException {
-
- // Null check
- if (qualifier == null) {
- throw new IOException("Cannot store column with null qualifier in "
- + tableMutator.getName().getNameAsString());
- }
-
- byte[] columnQualifier = getColumnPrefixBytes(qualifier);
-
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
- attributes);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #store(byte[],
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.
- * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
- */
- public void store(byte[] rowKey,
- TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
- Long timestamp, Object inputValue, Attribute...attributes)
- throws IOException {
-
- // Null check
- if (qualifier == null) {
- throw new IOException("Cannot store column with null qualifier in "
- + tableMutator.getName().getNameAsString());
- }
-
- byte[] columnQualifier = getColumnPrefixBytes(qualifier);
-
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
- attributes);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
- */
- public Object readResult(Result result, String qualifier) throws IOException {
- byte[] columnQualifier =
- ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- return column.readResult(result, columnQualifier);
- }
-
- public ValueConverter getValueConverter() {
- return column.getValueConverter();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResults(org.apache.hadoop.hbase.client.Result,
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
- */
- public <K> Map<K, Object> readResults(Result result,
- KeyConverter<K> keyConverter) throws IOException {
- return column.readResults(result, columnPrefixBytes, keyConverter);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
- * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result,
- * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter)
- */
- public <K, V> NavigableMap<K, NavigableMap<Long, V>>
- readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter)
- throws IOException {
- return column.readResultsWithTimestamps(result, columnPrefixBytes,
- keyConverter);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
deleted file mode 100644
index e89a6a7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-
-/**
- * Represents a rowkey for the application table.
- */
-public class ApplicationRowKey {
- private final String clusterId;
- private final String userId;
- private final String flowName;
- private final Long flowRunId;
- private final String appId;
- private final ApplicationRowKeyConverter appRowKeyConverter =
- new ApplicationRowKeyConverter();
-
- public ApplicationRowKey(String clusterId, String userId, String flowName,
- Long flowRunId, String appId) {
- this.clusterId = clusterId;
- this.userId = userId;
- this.flowName = flowName;
- this.flowRunId = flowRunId;
- this.appId = appId;
- }
-
- public String getClusterId() {
- return clusterId;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public String getFlowName() {
- return flowName;
- }
-
- public Long getFlowRunId() {
- return flowRunId;
- }
-
- public String getAppId() {
- return appId;
- }
-
- /**
- * Constructs a row key for the application table as follows:
- * {@code clusterId!userName!flowName!flowRunId!AppId}.
- *
- * @return byte array with the row key
- */
- public byte[] getRowKey() {
- return appRowKeyConverter.encode(this);
- }
-
- /**
- * Given the raw row key as bytes, returns the row key as an object.
- *
- * @param rowKey Byte representation of row key.
- * @return An <cite>ApplicationRowKey</cite> object.
- */
- public static ApplicationRowKey parseRowKey(byte[] rowKey) {
- return new ApplicationRowKeyConverter().decode(rowKey);
- }
-
- /**
- * Constructs a row key for the application table as follows:
- * {@code clusterId!userName!flowName!flowRunId!AppId}.
- * @return String representation of row key.
- */
- public String getRowKeyAsString() {
- return appRowKeyConverter.encodeAsString(this);
- }
-
- /**
- * Given the encoded row key as string, returns the row key as an object.
- * @param encodedRowKey String representation of row key.
- * @return A <cite>ApplicationRowKey</cite> object.
- */
- public static ApplicationRowKey parseRowKeyFromString(String encodedRowKey) {
- return new ApplicationRowKeyConverter().decodeFromString(encodedRowKey);
- }
-
- /**
- * Encodes and decodes row key for application table. The row key is of the
- * form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long,
- * appId is encoded and decoded using {@link AppIdKeyConverter} and rest are
- * strings.
- * <p>
- */
- final private static class ApplicationRowKeyConverter implements
- KeyConverter<ApplicationRowKey>, KeyConverterToString<ApplicationRowKey> {
-
- private final KeyConverter<String> appIDKeyConverter =
- new AppIdKeyConverter();
-
- /**
- * Intended for use in ApplicationRowKey only.
- */
- private ApplicationRowKeyConverter() {
- }
-
- /**
- * Application row key is of the form
- * clusterId!userName!flowName!flowRunId!appId with each segment separated
- * by !. The sizes below indicate sizes of each one of these segements in
- * sequence. clusterId, userName and flowName are strings. flowrunId is a
- * long hence 8 bytes in size. app id is represented as 12 bytes with
- * cluster timestamp part of appid takes 8 bytes(long) and seq id takes 4
- * bytes(int). Strings are variable in size (i.e. end whenever separator is
- * encountered). This is used while decoding and helps in determining where
- * to split.
- */
- private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
- Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
- AppIdKeyConverter.getKeySize() };
-
- /*
- * (non-Javadoc)
- *
- * Encodes ApplicationRowKey object into a byte array with each
- * component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
- * This leads to an application table row key of the form
- * clusterId!userName!flowName!flowRunId!appId If flowRunId in passed
- * ApplicationRowKey object is null (and the fields preceding it i.e.
- * clusterId, userId and flowName are not null), this returns a row key
- * prefix of the form clusterId!userName!flowName! and if appId in
- * ApplicationRowKey is null (other 4 components all are not null), this
- * returns a row key prefix of the form
- * clusterId!userName!flowName!flowRunId! flowRunId is inverted while
- * encoding as it helps maintain a descending order for row keys in the
- * application table.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common
- * .KeyConverter#encode(java.lang.Object)
- */
- @Override
- public byte[] encode(ApplicationRowKey rowKey) {
- byte[] cluster =
- Separator.encode(rowKey.getClusterId(), Separator.SPACE,
- Separator.TAB, Separator.QUALIFIERS);
- byte[] user =
- Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
- Separator.QUALIFIERS);
- byte[] flow =
- Separator.encode(rowKey.getFlowName(), Separator.SPACE,
- Separator.TAB, Separator.QUALIFIERS);
- byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
- // Note that flowRunId is a long, so we can't encode them all at the same
- // time.
- if (rowKey.getFlowRunId() == null) {
- return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
- }
- byte[] second =
- Bytes.toBytes(LongConverter.invertLong(
- rowKey.getFlowRunId()));
- if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
- return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
- }
- byte[] third = appIDKeyConverter.encode(rowKey.getAppId());
- return Separator.QUALIFIERS.join(first, second, third);
- }
-
- /*
- * (non-Javadoc)
- *
- * Decodes an application row key of the form
- * clusterId!userName!flowName!flowRunId!appId represented in byte format
- * and converts it into an ApplicationRowKey object.flowRunId is inverted
- * while decoding as it was inverted while encoding.
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.common
- * .KeyConverter#decode(byte[])
- */
- @Override
- public ApplicationRowKey decode(byte[] rowKey) {
- byte[][] rowKeyComponents =
- Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
- if (rowKeyComponents.length != 5) {
- throw new IllegalArgumentException("the row key is not valid for "
- + "an application");
- }
- String clusterId =
- Separator.decode(Bytes.toString(rowKeyComponents[0]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String userId =
- Separator.decode(Bytes.toString(rowKeyComponents[1]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- String flowName =
- Separator.decode(Bytes.toString(rowKeyComponents[2]),
- Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
- Long flowRunId =
- LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
- String appId = appIDKeyConverter.decode(rowKeyComponents[4]);
- return new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
- appId);
- }
-
- @Override
- public String encodeAsString(ApplicationRowKey key) {
- if (key.clusterId == null || key.userId == null || key.flowName == null
- || key.flowRunId == null || key.appId == null) {
- throw new IllegalArgumentException();
- }
- return TimelineReaderUtils
- .joinAndEscapeStrings(new String[] {key.clusterId, key.userId,
- key.flowName, key.flowRunId.toString(), key.appId});
- }
-
- @Override
- public ApplicationRowKey decodeFromString(String encodedRowKey) {
- List<String> split = TimelineReaderUtils.split(encodedRowKey);
- if (split == null || split.size() != 5) {
- throw new IllegalArgumentException(
- "Invalid row key for application table.");
- }
- Long flowRunId = Long.valueOf(split.get(3));
- return new ApplicationRowKey(split.get(0), split.get(1), split.get(2),
- flowRunId, split.get(4));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
deleted file mode 100644
index f61b0e9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
-
-/**
- * Represents a partial rowkey (without flowName or without flowName and
- * flowRunId) for the application table.
- */
-public class ApplicationRowKeyPrefix extends ApplicationRowKey implements
- RowKeyPrefix<ApplicationRowKey> {
-
- /**
- * Creates a prefix which generates the following rowKeyPrefixes for the
- * application table: {@code clusterId!userName!flowName!}.
- *
- * @param clusterId the cluster on which applications ran
- * @param userId the user that ran applications
- * @param flowName the name of the flow that was run by the user on the
- * cluster
- */
- public ApplicationRowKeyPrefix(String clusterId, String userId,
- String flowName) {
- super(clusterId, userId, flowName, null, null);
- }
-
- /**
- * Creates a prefix which generates the following rowKeyPrefixes for the
- * application table: {@code clusterId!userName!flowName!flowRunId!}.
- *
- * @param clusterId identifying the cluster
- * @param userId identifying the user
- * @param flowName identifying the flow
- * @param flowRunId identifying the instance of this flow
- */
- public ApplicationRowKeyPrefix(String clusterId, String userId,
- String flowName, Long flowRunId) {
- super(clusterId, userId, flowName, flowRunId, null);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.application.
- * RowKeyPrefix#getRowKeyPrefix()
- */
- @Override
- public byte[] getRowKeyPrefix() {
- return super.getRowKey();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
deleted file mode 100644
index 4da720e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The application table as column families info, config and metrics. Info
- * stores information about a YARN application entity, config stores
- * configuration data of a YARN application, metrics stores the metrics of a
- * YARN application. This table is entirely analogous to the entity table but
- * created for better performance.
- *
- * Example application table record:
- *
- * <pre>
- * |-------------------------------------------------------------------------|
- * | Row | Column Family | Column Family| Column Family|
- * | key | info | metrics | config |
- * |-------------------------------------------------------------------------|
- * | clusterId! | id:appId | metricId1: | configKey1: |
- * | userName! | | metricValue1 | configValue1 |
- * | flowName! | created_time: | @timestamp1 | |
- * | flowRunId! | 1392993084018 | | configKey2: |
- * | AppId | | metriciD1: | configValue2 |
- * | | i!infoKey: | metricValue2 | |
- * | | infoValue | @timestamp2 | |
- * | | | | |
- * | | r!relatesToKey: | metricId2: | |
- * | | id3=id4=id5 | metricValue1 | |
- * | | | @timestamp2 | |
- * | | s!isRelatedToKey: | | |
- * | | id7=id9=id6 | | |
- * | | | | |
- * | | e!eventId=timestamp=infoKey: | | |
- * | | eventInfoValue | | |
- * | | | | |
- * | | flowVersion: | | |
- * | | versionValue | | |
- * |-------------------------------------------------------------------------|
- * </pre>
- */
-public class ApplicationTable extends BaseTable<ApplicationTable> {
- /** application prefix. */
- private static final String PREFIX =
- YarnConfiguration.TIMELINE_SERVICE_PREFIX + "application";
-
- /** config param name that specifies the application table name. */
- public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
-
- /**
- * config param name that specifies the TTL for metrics column family in
- * application table.
- */
- private static final String METRICS_TTL_CONF_NAME = PREFIX
- + ".table.metrics.ttl";
-
- /**
- * config param name that specifies max-versions for metrics column family in
- * entity table.
- */
- private static final String METRICS_MAX_VERSIONS =
- PREFIX + ".table.metrics.max-versions";
-
- /** default value for application table name. */
- private static final String DEFAULT_TABLE_NAME =
- "timelineservice.application";
-
- /** default TTL is 30 days for metrics timeseries. */
- private static final int DEFAULT_METRICS_TTL = 2592000;
-
- /** default max number of versions. */
- private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000;
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ApplicationTable.class);
-
- public ApplicationTable() {
- super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
- }
-
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
- * (org.apache.hadoop.hbase.client.Admin,
- * org.apache.hadoop.conf.Configuration)
- */
- public void createTable(Admin admin, Configuration hbaseConf)
- throws IOException {
-
- TableName table = getTableName(hbaseConf);
- if (admin.tableExists(table)) {
- // do not disable / delete existing table
- // similar to the approach taken by map-reduce jobs when
- // output directory exists
- throw new IOException("Table " + table.getNameAsString()
- + " already exists.");
- }
-
- HTableDescriptor applicationTableDescp = new HTableDescriptor(table);
- HColumnDescriptor infoCF =
- new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes());
- infoCF.setBloomFilterType(BloomType.ROWCOL);
- applicationTableDescp.addFamily(infoCF);
-
- HColumnDescriptor configCF =
- new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes());
- configCF.setBloomFilterType(BloomType.ROWCOL);
- configCF.setBlockCacheEnabled(true);
- applicationTableDescp.addFamily(configCF);
-
- HColumnDescriptor metricsCF =
- new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes());
- applicationTableDescp.addFamily(metricsCF);
- metricsCF.setBlockCacheEnabled(true);
- // always keep 1 version (the latest)
- metricsCF.setMinVersions(1);
- metricsCF.setMaxVersions(
- hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS));
- metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME,
- DEFAULT_METRICS_TTL));
- applicationTableDescp.setRegionSplitPolicyClassName(
- "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
- applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
- TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
- admin.createTable(applicationTableDescp,
- TimelineHBaseSchemaConstants.getUsernameSplits());
- LOG.info("Status of table creation for " + table.getNameAsString() + "="
- + admin.tableExists(table));
- }
-
- /**
- * @param metricsTTL time to live parameter for the metrics in this table.
- * @param hbaseConf configuration in which to set the metrics TTL config
- * variable.
- */
- public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) {
- hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
deleted file mode 100644
index 03f508f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Package org.apache.hadoop.yarn.server.timelineservice.storage.application
- * contains classes related to implementation for application table.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
deleted file mode 100644
index 67497fc..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
-
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-
-import java.io.IOException;
-
-/**
- * Identifies fully qualified columns for the {@link AppToFlowTable}.
- */
-public enum AppToFlowColumn implements Column<AppToFlowTable> {
-
- /**
- * The flow ID.
- */
- FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"),
-
- /**
- * The flow run ID.
- */
- FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"),
-
- /**
- * The user.
- */
- USER_ID(AppToFlowColumnFamily.MAPPING, "user_id");
-
- private final ColumnHelper<AppToFlowTable> column;
- private final ColumnFamily<AppToFlowTable> columnFamily;
- private final String columnQualifier;
- private final byte[] columnQualifierBytes;
-
- AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily,
- String columnQualifier) {
- this.columnFamily = columnFamily;
- this.columnQualifier = columnQualifier;
- // Future-proof by ensuring the right column prefix hygiene.
- this.columnQualifierBytes =
- Bytes.toBytes(Separator.SPACE.encode(columnQualifier));
- this.column = new ColumnHelper<AppToFlowTable>(columnFamily);
- }
-
- /**
- * @return the column name value
- */
- private String getColumnQualifier() {
- return columnQualifier;
- }
-
- @Override
- public byte[] getColumnQualifierBytes() {
- return columnQualifierBytes.clone();
- }
-
- public void store(byte[] rowKey,
- TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
- Object inputValue, Attribute... attributes) throws IOException {
- column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue, attributes);
- }
-
- @Override
- public byte[] getColumnFamilyBytes() {
- return columnFamily.getBytes();
- }
-
- @Override
- public ValueConverter getValueConverter() {
- return column.getValueConverter();
- }
-
- public Object readResult(Result result) throws IOException {
- return column.readResult(result, columnQualifierBytes);
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org