You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:31 UTC
[14/50] [abbrv] hadoop git commit: YARN-3901. Populate flow run data
in the flow_run & flow activity tables (Vrushali C via sjlee)
YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a469bfe7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a469bfe7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a469bfe7
Branch: refs/heads/feature-YARN-2928
Commit: a469bfe7c5d0b71cc0a3cc4f010b17e8f4267872
Parents: 983e729
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Sep 17 10:34:52 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:57 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop-yarn-server-timelineservice/pom.xml | 13 +
.../storage/HBaseTimelineWriterImpl.java | 179 ++++++-
.../storage/TimelineSchemaCreator.java | 22 +-
.../storage/application/ApplicationColumn.java | 5 +-
.../application/ApplicationColumnPrefix.java | 15 +-
.../storage/apptoflow/AppToFlowColumn.java | 6 +-
.../timelineservice/storage/common/Column.java | 6 +-
.../storage/common/ColumnHelper.java | 93 +++-
.../storage/common/ColumnPrefix.java | 28 +-
.../storage/common/TimelineWriterUtils.java | 185 +++++++
.../storage/common/TimestampGenerator.java | 112 +++++
.../storage/common/package-info.java | 24 -
.../storage/entity/EntityColumn.java | 6 +-
.../storage/entity/EntityColumnPrefix.java | 20 +-
.../flow/AggregationCompactionDimension.java | 63 +++
.../storage/flow/AggregationOperation.java | 87 ++++
.../timelineservice/storage/flow/Attribute.java | 39 ++
.../storage/flow/FlowActivityColumnFamily.java | 54 +++
.../storage/flow/FlowActivityColumnPrefix.java | 243 ++++++++++
.../storage/flow/FlowActivityRowKey.java | 113 +++++
.../storage/flow/FlowActivityTable.java | 107 ++++
.../storage/flow/FlowRunColumn.java | 161 ++++++
.../storage/flow/FlowRunColumnFamily.java | 54 +++
.../storage/flow/FlowRunColumnPrefix.java | 239 +++++++++
.../storage/flow/FlowRunCoprocessor.java | 210 ++++++++
.../storage/flow/FlowRunRowKey.java | 50 ++
.../storage/flow/FlowRunTable.java | 141 ++++++
.../storage/flow/FlowScanner.java | 486 +++++++++++++++++++
.../storage/TestHBaseTimelineStorage.java | 28 +-
.../storage/flow/TestFlowDataGenerator.java | 213 ++++++++
.../flow/TestHBaseStorageFlowActivity.java | 372 ++++++++++++++
.../storage/flow/TestHBaseStorageFlowRun.java | 290 +++++++++++
33 files changed, 3562 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 667efd7..98c5e94 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -103,6 +103,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-4102. Add a "skip existing table" mode for timeline schema creator (Li
Lu via sjlee)
+ YARN-3901. Populate flow run data in the flow_run & flow activity tables
+ (Vrushali C via sjlee)
+
IMPROVEMENTS
YARN-3276. Code cleanup for timeline service API records. (Junping Du via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 3c41bce..616ed06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -180,6 +180,19 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionnalDependencies>
+ <additionnalDependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ </additionnalDependency>
+ </additionnalDependencies>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 772002d..7c4a5da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -33,11 +33,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
- * This implements a hbase based backend for storing application timeline entity
+ * This implements a hbase based backend for storing the timeline entity
* information.
+ * It writes to multiple tables at the backend
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HBaseTimelineWriterImpl extends AbstractService implements
TimelineWriter {
+ private static final Log LOG = LogFactory
+ .getLog(HBaseTimelineWriterImpl.class);
+
private Connection conn;
private TypedBufferedMutator<EntityTable> entityTable;
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
private TypedBufferedMutator<ApplicationTable> applicationTable;
-
- private static final Log LOG = LogFactory
- .getLog(HBaseTimelineWriterImpl.class);
+ private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+ private TypedBufferedMutator<FlowRunTable> flowRunTable;
public HBaseTimelineWriterImpl() {
super(HBaseTimelineWriterImpl.class.getName());
@@ -91,6 +103,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
+ flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
+ flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn);
}
/**
@@ -111,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// if the entity is the application, the destination is the application
// table
- boolean isApplication = isApplicationEntity(te);
+ boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
byte[] rowKey = isApplication ?
ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
appId) :
@@ -124,37 +138,139 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
storeMetrics(rowKey, te.getMetrics(), isApplication);
storeRelations(rowKey, te, isApplication);
- if (isApplicationCreated(te)) {
- onApplicationCreated(
- clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+ if (isApplication) {
+ if (TimelineWriterUtils.isApplicationCreated(te)) {
+ onApplicationCreated(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ }
+ // if it's an application entity, store metrics
+ storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
+ appId, te);
+ // if application has finished, store it's finish time and write final
+ // values
+ // of all metrics
+ if (TimelineWriterUtils.isApplicationFinished(te)) {
+ onApplicationFinished(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ }
}
}
return putStatus;
}
- private static boolean isApplicationEntity(TimelineEntity te) {
- return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+ private void onApplicationCreated(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ // store in App to flow table
+ storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId,
+ appId, te);
+ // store in flow run table
+ storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ // store in flow activity table
+ storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
}
- private static boolean isApplicationCreated(TimelineEntity te) {
- if (isApplicationEntity(te)) {
- for (TimelineEvent event : te.getEvents()) {
- if (event.getId().equals(
- ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
- return true;
- }
- }
- }
- return false;
+ /*
+ * updates the {@link FlowActivityTable} with the Application TimelineEntity
+ * information
+ */
+ private void storeInFlowActivityTable(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
+ byte[] qualifier = GenericObjectMapper.write(flowRunId);
+ FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+ null, flowVersion,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
}
- private void onApplicationCreated(String clusterId, String userId,
+ /*
+ * updates the {@link FlowRunTable} with Application Created information
+ */
+ private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+ flowRunId);
+ FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
+ te.getCreatedTime(),
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+
+ private void storeInAppToFlowTable(String clusterId, String userId,
String flowName, String flowVersion, long flowRunId, String appId,
TimelineEntity te) throws IOException {
byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
- AppToFlowColumn.FLOW_RUN_ID.store(
- rowKey, appToFlowTable, null, flowRunId);
+ AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+ }
+
+ /*
+ * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+ * application has finished
+ */
+ private void onApplicationFinished(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntity te) throws IOException {
+ // store in flow run table
+ storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
+ appId, te);
+
+ // indicate in the flow activity table that the app has finished
+ storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+ flowRunId, appId, te);
+ }
+
+ /*
+ * Update the {@link FlowRunTable} with Application Finished information
+ */
+ private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
+ String flowName, long flowRunId, String appId, TimelineEntity te)
+ throws IOException {
+ byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+ flowRunId);
+ Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
+ .getAttribute(appId);
+ FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
+ TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+
+ // store the final value of metrics since application has finished
+ Set<TimelineMetric> metrics = te.getMetrics();
+ if (metrics != null) {
+ storeFlowMetrics(rowKey, metrics, attributeAppId,
+ AggregationOperation.SUM_FINAL.getAttribute());
+ }
+ }
+
+ /*
+ * Updates the {@link FlowRunTable} with Application Metrics
+ */
+ private void storeFlowMetricsAppRunning(String clusterId, String userId,
+ String flowName, long flowRunId, String appId, TimelineEntity te)
+ throws IOException {
+ Set<TimelineMetric> metrics = te.getMetrics();
+ if (metrics != null) {
+ byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+ flowRunId);
+ storeFlowMetrics(rowKey, metrics,
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ }
+ }
+
+ private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+ Attribute... attributes) throws IOException {
+ for (TimelineMetric metric : metrics) {
+ String metricColumnQualifier = metric.getId();
+ Map<Long, Number> timeseries = metric.getValues();
+ for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+ Long timestamp = timeseriesEntry.getKey();
+ FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
+ metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+ attributes);
+ }
+ }
}
private void storeRelations(byte[] rowKey, TimelineEntity te,
@@ -184,7 +300,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
// id3?id4?id5
String compoundValue =
Separator.VALUES.joinEncoded(connectedEntity.getValue());
-
columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
compoundValue);
}
@@ -342,6 +457,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
entityTable.flush();
appToFlowTable.flush();
applicationTable.flush();
+ flowRunTable.flush();
+ flowActivityTable.flush();
}
/**
@@ -364,6 +481,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
LOG.info("closing the application table");
applicationTable.close();
}
+ if (flowRunTable != null) {
+ LOG.info("closing the flow run table");
+ // The close API performs flushing and releases any resources held
+ flowRunTable.close();
+ }
+ if (flowActivityTable != null) {
+ LOG.info("closing the flowActivityTable table");
+ // The close API performs flushing and releases any resources held
+ flowActivityTable.close();
+ }
if (conn != null) {
LOG.info("closing the hbase Connection");
conn.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index e7e51a7..cbcff4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
/**
* This creates the schema for a hbase based backend for storing application
@@ -199,7 +201,7 @@ public class TimelineSchemaCreator {
return commandLine;
}
- private static void createAllTables(Configuration hbaseConf,
+ public static void createAllTables(Configuration hbaseConf,
boolean skipExisting) throws IOException {
Connection conn = null;
@@ -236,6 +238,24 @@ public class TimelineSchemaCreator {
throw e;
}
}
+ try {
+ new FlowRunTable().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
+ try {
+ new FlowActivityTable().createTable(admin, hbaseConf);
+ } catch (IOException e) {
+ if (skipExisting) {
+ LOG.warn("Skip and continue on: " + e.getMessage());
+ } else {
+ throw e;
+ }
+ }
} finally {
if (conn != null) {
conn.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
index c028386..802626d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link ApplicationTable}.
@@ -76,9 +77,9 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
- Object inputValue) throws IOException {
+ Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue);
+ inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index ad1def6..d7b5773 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the application table.
@@ -112,7 +113,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -123,8 +125,9 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
- }
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
/*
* (non-Javadoc)
@@ -137,7 +140,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute...attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -148,7 +152,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
}
/*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
index 423037a..859fdca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
import java.io.IOException;
+import java.util.Map;
/**
* Identifies fully qualified columns for the {@link AppToFlowTable}.
@@ -67,9 +69,9 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
- Object inputValue) throws IOException {
+ Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue);
+ inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
index 3397d62..64c1cda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* A Column represents the way to store a fully qualified column in a specific
@@ -38,12 +39,15 @@ public interface Column<T> {
* column.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
+ * @param attributes Map of attributes for this mutation. used in the coprocessor
+ * to set/read the cell tags. Can be null.
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
- Long timestamp, Object inputValue) throws IOException;
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index f1b7c58..3a2e088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* This class is meant to be used only by explicit Columns, and not directly to
* write by clients.
@@ -58,31 +59,66 @@ public class ColumnHelper<T> {
* Sends a Mutation to the table. The mutations will be buffered and sent over
* the wire as part of a batch.
*
- * @param rowKey identifying the row to write. Nothing gets written when null.
- * @param tableMutator used to modify the underlying HBase table
- * @param columnQualifier column qualifier. Nothing gets written when null.
- * @param timestamp version timestamp. When null the server timestamp will be
- * used.
- * @param inputValue the value to write to the rowKey and column qualifier.
- * Nothing gets written when null.
+ * @param rowKey
+ * identifying the row to write. Nothing gets written when null.
+ * @param tableMutator
+ * used to modify the underlying HBase table
+ * @param columnQualifier
+ * column qualifier. Nothing gets written when null.
+ * @param timestamp
+ * version timestamp. When null the current timestamp multiplied with
+ * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+ * app id will be used
+ * @param inputValue
+ * the value to write to the rowKey and column qualifier. Nothing
+ * gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
- byte[] columnQualifier, Long timestamp, Object inputValue)
- throws IOException {
+ byte[] columnQualifier, Long timestamp, Object inputValue,
+ Attribute... attributes) throws IOException {
if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
return;
}
Put p = new Put(rowKey);
+ timestamp = getPutTimestamp(timestamp, attributes);
+ p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+ GenericObjectMapper.write(inputValue));
+ if ((attributes != null) && (attributes.length > 0)) {
+ for (Attribute attribute : attributes) {
+ p.setAttribute(attribute.getName(), attribute.getValue());
+ }
+ }
+ tableMutator.mutate(p);
+ }
+ /*
+ * Figures out the cell timestamp used in the Put For storing into flow run
+ * table. We would like to left shift the timestamp and supplement it with the
+ * AppId id so that there are no collisions in the flow run table's cells
+ */
+ private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
if (timestamp == null) {
- p.addColumn(columnFamilyBytes, columnQualifier,
- GenericObjectMapper.write(inputValue));
- } else {
- p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
- GenericObjectMapper.write(inputValue));
+ timestamp = System.currentTimeMillis();
}
- tableMutator.mutate(p);
+ String appId = getAppIdFromAttributes(attributes);
+ long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+ timestamp, appId);
+ return supplementedTS;
+ }
+
+ private String getAppIdFromAttributes(Attribute[] attributes) {
+ if (attributes == null) {
+ return null;
+ }
+ String appId = null;
+ for (Attribute attribute : attributes) {
+ if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+ attribute.getName())) {
+ appId = Bytes.toString(attribute.getValue());
+ }
+ }
+ return appId;
}
/**
@@ -171,7 +207,9 @@ public class ColumnHelper<T> {
for (Entry<Long, byte[]> cell : cells.entrySet()) {
V value =
(V) GenericObjectMapper.read(cell.getValue());
- cellResults.put(cell.getKey(), value);
+ cellResults.put(
+ TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
+ value);
}
}
results.put(columnName, cellResults);
@@ -315,6 +353,27 @@ public class ColumnHelper<T> {
/**
* @param columnPrefixBytes The byte representation for the column prefix.
* Should not contain {@link Separator#QUALIFIERS}.
+ * @param qualifier for the remainder of the column.
+ * @return fully sanitized column qualifier that is a combination of prefix
+ * and qualifier. If prefix is null, the result is simply the encoded
+ * qualifier without any separator.
+ */
+ public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+ long qualifier) {
+
+ if (columnPrefixBytes == null) {
+ return Bytes.toBytes(qualifier);
+ }
+
+ // Convert qualifier to lower case, strip of separators and tag on column
+ // prefix.
+ byte[] columnQualifier =
+ Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
+ return columnQualifier;
+ }
+ /**
+ * @param columnPrefixBytes The byte representation for the column prefix.
+ * Should not contain {@link Separator#QUALIFIERS}.
* @param qualifier the byte representation for the remainder of the column.
* @return fully sanitized column qualifier that is a combination of prefix
* and qualifier. If prefix is null, the result is simply the encoded
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 509ff49..db49098 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -23,6 +23,7 @@ import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Used to represent a partially qualified column, where the actual column name
@@ -43,12 +44,36 @@ public interface ColumnPrefix<T> {
* @param qualifier column qualifier. Nothing gets written when null.
* @param timestamp version timestamp. When null the server timestamp will be
* used.
+ *@param attributes attributes for the mutation that are used by the coprocessor
+ * to set/read the cell tags
* @param inputValue the value to write to the rowKey and column qualifier.
* Nothing gets written when null.
* @throws IOException
*/
public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
- String qualifier, Long timestamp, Object inputValue) throws IOException;
+ byte[] qualifier, Long timestamp, Object inputValue,
+ Attribute... attributes) throws IOException;
+
+ /**
+ * Sends a Mutation to the table. The mutations will be buffered and sent over
+ * the wire as part of a batch.
+ *
+ * @param rowKey identifying the row to write. Nothing gets written when null.
+ * @param tableMutator used to modify the underlying HBase table. Caller is
+ * responsible to pass a mutator for the table that actually has this
+ * column.
+ * @param qualifier column qualifier. Nothing gets written when null.
+ * @param timestamp version timestamp. When null the server timestamp will be
+ * used.
+ *@param attributes attributes for the mutation that are used by the coprocessor
+ * to set/read the cell tags
+ * @param inputValue the value to write to the rowKey and column qualifier.
+ * Nothing gets written when null.
+ * @throws IOException
+ */
+ public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+ String qualifier, Long timestamp, Object inputValue,
+ Attribute... attributes) throws IOException;
/**
* Get the latest version of this specified column. Note: this call clones the
@@ -81,4 +106,5 @@ public interface ColumnPrefix<T> {
*/
public <V> NavigableMap<String, NavigableMap<Long, V>>
readResultsWithTimestamps(Result result) throws IOException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
index 58bdedc7e..371371a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -19,9 +19,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
import java.util.ArrayList;
import java.util.List;
+import java.util.SortedSet;
+import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* bunch of utility functions used across TimelineWriter classes
@@ -36,6 +46,9 @@ public class TimelineWriterUtils {
/** indicator for no limits for splitting */
public static final int NO_LIMIT_SPLIT = -1;
+ /** milliseconds in one day */
+ public static final long MILLIS_ONE_DAY = 86400000L;
+
/**
* Splits the source array into multiple array segments using the given
* separator, up to a maximum of count items. This will naturally produce
@@ -140,4 +153,176 @@ public class TimelineWriterUtils {
return Long.MAX_VALUE - key;
}
+ /**
+ * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+ * for a given input timestamp
+ *
+ * @param ts
+ * @return timestamp of that day's beginning (midnight)
+ */
+ public static long getTopOfTheDayTimestamp(long ts) {
+ long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+ return dayTimestamp;
+ }
+
+ /**
+ * Combines the input array of attributes and the input aggregation operation
+ * into a new array of attributes.
+ *
+ * @param attributes
+ * @param aggOp
+ * @return array of combined attributes
+ */
+ public static Attribute[] combineAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+ Attribute[] combinedAttributes = new Attribute[newLength];
+
+ if (attributes != null) {
+ System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+ }
+
+ if (aggOp != null) {
+ Attribute a2 = aggOp.getAttribute();
+ combinedAttributes[newLength - 1] = a2;
+ }
+ return combinedAttributes;
+ }
+
+ /**
+ * Returns a number for the new array size. The new array is the combination
+ * of input array of attributes and the input aggregation operation.
+ *
+ * @param attributes
+ * @param aggOp
+ * @return the size for the new array
+ */
+ private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+ AggregationOperation aggOp) {
+ int oldLength = getAttributesLength(attributes);
+ int aggLength = getAppOpLength(aggOp);
+ return oldLength + aggLength;
+ }
+
+ private static int getAppOpLength(AggregationOperation aggOp) {
+ if (aggOp != null) {
+ return 1;
+ }
+ return 0;
+ }
+
+ private static int getAttributesLength(Attribute[] attributes) {
+ if (attributes != null) {
+ return attributes.length;
+ }
+ return 0;
+ }
+
+ /**
+ * checks if an application has finished
+ *
+ * @param te
+ * @return true if application has finished else false
+ */
+ public static boolean isApplicationFinished(TimelineEntity te) {
+ SortedSet<TimelineEvent> allEvents = te.getEvents();
+ if ((allEvents != null) && (allEvents.size() > 0)) {
+ TimelineEvent event = allEvents.last();
+ if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * get the time at which an app finished
+ *
+ * @param te
+ * @return true if application has finished else false
+ */
+ public static long getApplicationFinishedTime(TimelineEntity te) {
+ SortedSet<TimelineEvent> allEvents = te.getEvents();
+ if ((allEvents != null) && (allEvents.size() > 0)) {
+ TimelineEvent event = allEvents.last();
+ if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+ return event.getTimestamp();
+ }
+ }
+ return 0l;
+ }
+
+ /**
+ * Checks if the input TimelineEntity object is an ApplicationEntity.
+ *
+ * @param te
+ * @return true if input is an ApplicationEntity, false otherwise
+ */
+ public static boolean isApplicationEntity(TimelineEntity te) {
+ return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+ }
+
+ /**
+ * Checks for the APPLICATION_CREATED event.
+ *
+ * @param te
+ * @return true is application event exists, false otherwise
+ */
+ public static boolean isApplicationCreated(TimelineEntity te) {
+ if (isApplicationEntity(te)) {
+ for (TimelineEvent event : te.getEvents()) {
+ if (event.getId()
+ .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the first seen aggregation operation as seen in the list of input
+ * tags or null otherwise
+ *
+ * @param tags
+ * @return AggregationOperation
+ */
+ public static AggregationOperation getAggregationOperationFromTagsList(
+ List<Tag> tags) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ for (Tag tag : tags) {
+ if (tag.getType() == aggOp.getTagType()) {
+ return aggOp;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates a {@link Tag} from the input attribute.
+ *
+ * @param attribute
+ * @return Tag
+ */
+ public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
+ // attribute could be either an Aggregation Operation or
+ // an Aggregation Dimension
+ // Get the Tag type from either
+ AggregationOperation aggOp = AggregationOperation
+ .getAggregationOperation(attribute.getKey());
+ if (aggOp != null) {
+ Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ return t;
+ }
+
+ AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
+ .getAggregationCompactionDimension(attribute.getKey());
+ if (aggCompactDim != null) {
+ Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ return t;
+ }
+ return null;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
new file mode 100644
index 0000000..555b64e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * Utility class that allows HBase coprocessors to interact with unique
+ * timestamps.
+ */
+public class TimestampGenerator {
+
+ /*
+ * if this is changed, then reading cell timestamps written with older
+ * multiplier value will not work
+ */
+ public static final long TS_MULTIPLIER = 1000L;
+
+ private final AtomicLong lastTimestamp = new AtomicLong();
+
+ /**
+ * Returns the current wall clock time in milliseconds, multiplied by the
+ * required precision.
+ */
+ public long currentTime() {
+ // We want to align cell timestamps with current time.
+ // cell timestamps are not be less than
+ // System.currentTimeMillis() * TS_MULTIPLIER.
+ return System.currentTimeMillis() * TS_MULTIPLIER;
+ }
+
+ /**
+ * Returns a timestamp value unique within the scope of this
+ * {@code TimestampGenerator} instance. For usage by HBase
+ * {@code RegionObserver} coprocessors, this normally means unique within a
+ * given region.
+ *
+ * Unlikely scenario of generating a non-unique timestamp: if there is a
+ * sustained rate of more than 1M hbase writes per second AND if region fails
+ * over within that time range of timestamps being generated then there may be
+ * collisions writing to a cell version of the same column.
+ */
+ public long getUniqueTimestamp() {
+ long lastTs;
+ long nextTs;
+ do {
+ lastTs = lastTimestamp.get();
+ nextTs = Math.max(lastTs + 1, currentTime());
+ } while (!lastTimestamp.compareAndSet(lastTs, nextTs));
+ return nextTs;
+ }
+
+ /**
+ * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+ * application id
+ *
+ * Unlikely scenario of generating a timestamp that is a duplicate: If more
+ * than a 1000 concurrent apps are running in one flow run AND write to same
+ * column at the same time, then say appId of 1001 will overlap with appId of
+ * 001 and there may be collisions for that flow run's specific column.
+ *
+ * @param incomingTS
+ * @param appId
+ * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
+ * application id
+ */
+ public static long getSupplementedTimestamp(long incomingTS, String appId) {
+ long suffix = getAppIdSuffix(appId);
+ long outgoingTS = incomingTS * TS_MULTIPLIER + suffix;
+ return outgoingTS;
+
+ }
+
+ private static long getAppIdSuffix(String appIdStr) {
+ if (appIdStr == null) {
+ return 0L;
+ }
+ ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+ long id = appId.getId() % TS_MULTIPLIER;
+ return id;
+ }
+
+ /**
+ * truncates the last few digits of the timestamp which were supplemented by
+ * the TimestampGenerator#getSupplementedTimestamp function
+ *
+ * @param incomingTS
+ * @return a truncated timestamp value
+ */
+ public static long getTruncatedTimestamp(long incomingTS) {
+ return incomingTS / TS_MULTIPLIER;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
deleted file mode 100644
index 32577fb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 26e7748..8ae19b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies fully qualified columns for the {@link EntityTable}.
@@ -81,9 +83,9 @@ public enum EntityColumn implements Column<EntityTable> {
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
- Object inputValue) throws IOException {
+ Object inputValue, Attribute... attributes) throws IOException {
column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
- inputValue);
+ inputValue, attributes);
}
public Object readResult(Result result) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 75ff742..0d4e5a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
/**
* Identifies partially qualified columns for the entity table.
@@ -108,11 +109,13 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
* org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
* #store(byte[],
* org.apache.hadoop.yarn.server.timelineservice.storage.common.
- * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -123,8 +126,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
- }
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
/*
* (non-Javadoc)
@@ -137,7 +141,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
*/
public void store(byte[] rowKey,
TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
- Long timestamp, Object inputValue) throws IOException {
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
// Null check
if (qualifier == null) {
@@ -148,8 +153,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
byte[] columnQualifier =
ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
- column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
- }
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ attributes);
+ }
/*
* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
new file mode 100644
index 0000000..ff12c7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the compaction dimensions for the data in the {@link FlowRunTable}
+ * .
+ */
+public enum AggregationCompactionDimension {
+
+ /**
+ * the application id
+ */
+ APPLICATION_ID((byte) 101);
+
+ private byte tagType;
+ private byte[] inBytes;
+
+ private AggregationCompactionDimension(byte tagType) {
+ this.tagType = tagType;
+ this.inBytes = Bytes.toBytes(this.name());
+ }
+
+ public Attribute getAttribute(String attributeValue) {
+ return new Attribute(this.name(), Bytes.toBytes(attributeValue));
+ }
+
+ public byte getTagType() {
+ return tagType;
+ }
+
+ public byte[] getInBytes() {
+ return this.inBytes.clone();
+ }
+
+ public static AggregationCompactionDimension getAggregationCompactionDimension(
+ String aggCompactDimStr) {
+ for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
+ .values()) {
+ if (aggDim.name().equals(aggCompactDimStr)) {
+ return aggDim;
+ }
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
new file mode 100644
index 0000000..c635ce6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
+ * The numbers used for tagType are prime numbers
+ */
+public enum AggregationOperation {
+
+ /**
+ * When the flow was started.
+ */
+ MIN((byte) 71),
+
+ /**
+ * When it ended.
+ */
+ MAX((byte) 73),
+
+ /**
+ * The metrics of the flow
+ */
+ SUM((byte) 79),
+
+ /**
+ * application running
+ */
+ SUM_FINAL((byte) 83),
+
+ /**
+ * compact
+ */
+ COMPACT((byte) 89);
+
+ private byte tagType;
+ private byte[] inBytes;
+
+ private AggregationOperation(byte tagType) {
+ this.tagType = tagType;
+ this.inBytes = Bytes.toBytes(this.name());
+ }
+
+ public Attribute getAttribute() {
+ return new Attribute(this.name(), this.inBytes);
+ }
+
+ public byte getTagType() {
+ return tagType;
+ }
+
+ public byte[] getInBytes() {
+ return this.inBytes.clone();
+ }
+
+ /**
+ * returns the AggregationOperation enum that represents that string
+ * @param aggOpStr
+ * @return the AggregationOperation enum that represents that string
+ */
+ public static AggregationOperation getAggregationOperation(String aggOpStr) {
+ for (AggregationOperation aggOp : AggregationOperation.values()) {
+ if (aggOp.name().equals(aggOpStr)) {
+ return aggOp;
+ }
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
new file mode 100644
index 0000000..d3de518
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+/**
+ * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}.
+ */
+public class Attribute {
+ private final String name;
+ private final byte[] value;
+
+ public Attribute(String name, byte[] value) {
+ this.name = name;
+ this.value = value.clone();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public byte[] getValue() {
+ return value.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
new file mode 100644
index 0000000..d991b42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> {
+
+ /**
+ * Info column family houses known columns, specifically ones included in
+ * columnfamily filters.
+ */
+ INFO("i");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value
+ * create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ private FlowActivityColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
new file mode 100644
index 0000000..b899e5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowActivityTable}
+ */
+public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> {
+
+ /**
+ * To store run ids of the flows
+ */
+ RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
+
+ private final ColumnHelper<FlowActivityTable> column;
+ private final ColumnFamily<FlowActivityTable> columnFamily;
+
+ /**
+ * Can be null for those cases where the provided column qualifier is the
+ * entire column name.
+ */
+ private final String columnPrefix;
+ private final byte[] columnPrefixBytes;
+
+ private final AggregationOperation aggOp;
+
+ /**
+ * Private constructor, meant to be used by the enum definition.
+ *
+ * @param columnFamily
+ * that this column is stored in.
+ * @param columnPrefix
+ * for this column.
+ */
+ private FlowActivityColumnPrefix(
+ ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+ AggregationOperation aggOp) {
+ column = new ColumnHelper<FlowActivityTable>(columnFamily);
+ this.columnFamily = columnFamily;
+ this.columnPrefix = columnPrefix;
+ if (columnPrefix == null) {
+ this.columnPrefixBytes = null;
+ } else {
+ // Future-proof by ensuring the right column prefix hygiene.
+ this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+ .encode(columnPrefix));
+ }
+ this.aggOp = aggOp;
+ }
+
+ /**
+ * @return the column name value
+ */
+ public String getColumnPrefix() {
+ return columnPrefix;
+ }
+
+ public byte[] getColumnPrefixBytes() {
+ return columnPrefixBytes.clone();
+ }
+
+ public AggregationOperation getAttribute() {
+ return aggOp;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+ */
+ @Override
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+ combinedAttributes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+ */
+ public Object readResult(Result result, String qualifier) throws IOException {
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ return column.readResult(result, columnQualifier);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResults(org.apache.hadoop.hbase.client.Result)
+ */
+ public Map<String, Object> readResults(Result result) throws IOException {
+ return column.readResults(result, columnPrefixBytes);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+ */
+ public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+ Result result) throws IOException {
+ return column.readResultsWithTimestamps(result, columnPrefixBytes);
+ }
+
+ /**
+ * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+ * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+ * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+ *
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowActivityColumnPrefix} or null
+ */
+ public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
+
+ // Match column based on value, assume column family matches.
+ for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix
+ .values()) {
+ // Find a match based only on name.
+ if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
+ return flowActivityColPrefix;
+ }
+ }
+ // Default to null
+ return null;
+ }
+
+ /**
+ * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+ * is no match. The following holds true:
+ * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+ * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+ *
+ * @param columnFamily
+ * The columnFamily for which to retrieve the column.
+ * @param columnPrefix
+ * Name of the column to retrieve
+ * @return the corresponding {@link FlowActivityColumnPrefix} or null if both
+ * arguments don't match.
+ */
+ public static final FlowActivityColumnPrefix columnFor(
+ FlowActivityColumnFamily columnFamily, String columnPrefix) {
+
+ // TODO: needs unit test to confirm and need to update javadoc to explain
+ // null prefix case.
+
+ for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix
+ .values()) {
+ // Find a match based column family and on name.
+ if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
+ && (((columnPrefix == null) && (flowActivityColumnPrefix
+ .getColumnPrefix() == null)) || (flowActivityColumnPrefix
+ .getColumnPrefix().equals(columnPrefix)))) {
+ return flowActivityColumnPrefix;
+ }
+ }
+ // Default to null
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+ * #store(byte[],
+ * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+ * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+ * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+ */
+ @Override
+ public void store(byte[] rowKey,
+ TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
+ Long timestamp, Object inputValue, Attribute... attributes)
+ throws IOException {
+ // Null check
+ if (qualifier == null) {
+ throw new IOException("Cannot store column with null qualifier in "
+ + tableMutator.getName().getNameAsString());
+ }
+
+ byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+ this.columnPrefixBytes, qualifier);
+ Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+ attributes, this.aggOp);
+ column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
+ combinedAttributes);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a469bfe7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
new file mode 100644
index 0000000..19e4e83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow activity table.
+ */
+public class FlowActivityRowKey {
+
+ private final String clusterId;
+ private final long dayTs;
+ private final String userId;
+ private final String flowId;
+
+ public FlowActivityRowKey(String clusterId, long dayTs, String userId,
+ String flowId) {
+ this.clusterId = clusterId;
+ this.dayTs = dayTs;
+ this.userId = userId;
+ this.flowId = flowId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public long getDayTimestamp() {
+ return dayTs;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ /**
+ * Constructs a row key for the flow activity table as follows:
+ * {@code clusterId!dayTimestamp!user!flowId}
+ *
+ * Will insert into current day's record in the table
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @return byte array with the row key prefix
+ */
+ public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ return getRowKey(clusterId, dayTs, userId, flowId);
+ }
+
+ /**
+ * Constructs a row key for the flow activity table as follows:
+ * {@code clusterId!dayTimestamp!user!flowId}
+ *
+ * @param clusterId
+ * @param dayTs
+ * @param userId
+ * @param flowId
+ * @return byte array for the row key
+ */
+ public static byte[] getRowKey(String clusterId, long dayTs, String userId,
+ String flowId) {
+ return Separator.QUALIFIERS.join(
+ Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
+ Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+ Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
+ Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
+ }
+
+ /**
+ * Given the raw row key as bytes, returns the row key as an object.
+ */
+ public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+ if (rowKeyComponents.length < 4) {
+ throw new IllegalArgumentException("the row key is not valid for "
+ + "a flow activity");
+ }
+
+ String clusterId = Separator.QUALIFIERS.decode(Bytes
+ .toString(rowKeyComponents[0]));
+ long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+ String userId = Separator.QUALIFIERS.decode(Bytes
+ .toString(rowKeyComponents[2]));
+ String flowId = Separator.QUALIFIERS.decode(Bytes
+ .toString(rowKeyComponents[3]));
+ return new FlowActivityRowKey(clusterId, dayTs, userId, flowId);
+ }
+}