You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/17 19:51:40 UTC

[3/3] hadoop git commit: YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)

YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b37985e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b37985e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b37985e

Branch: refs/heads/YARN-2928
Commit: 4b37985e6a238c7f84eecc4e7eadd30dffc90b88
Parents: b1960e0
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Sep 17 10:34:52 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Sep 17 10:34:52 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop-yarn-server-timelineservice/pom.xml  |  13 +
 .../storage/HBaseTimelineWriterImpl.java        | 179 ++++++-
 .../storage/TimelineSchemaCreator.java          |  22 +-
 .../storage/application/ApplicationColumn.java  |   5 +-
 .../application/ApplicationColumnPrefix.java    |  15 +-
 .../storage/apptoflow/AppToFlowColumn.java      |   6 +-
 .../timelineservice/storage/common/Column.java  |   6 +-
 .../storage/common/ColumnHelper.java            |  93 +++-
 .../storage/common/ColumnPrefix.java            |  28 +-
 .../storage/common/TimelineWriterUtils.java     | 185 +++++++
 .../storage/common/TimestampGenerator.java      | 112 +++++
 .../storage/common/package-info.java            |  24 -
 .../storage/entity/EntityColumn.java            |   6 +-
 .../storage/entity/EntityColumnPrefix.java      |  20 +-
 .../flow/AggregationCompactionDimension.java    |  63 +++
 .../storage/flow/AggregationOperation.java      |  87 ++++
 .../timelineservice/storage/flow/Attribute.java |  39 ++
 .../storage/flow/FlowActivityColumnFamily.java  |  54 +++
 .../storage/flow/FlowActivityColumnPrefix.java  | 243 ++++++++++
 .../storage/flow/FlowActivityRowKey.java        | 113 +++++
 .../storage/flow/FlowActivityTable.java         | 107 ++++
 .../storage/flow/FlowRunColumn.java             | 161 ++++++
 .../storage/flow/FlowRunColumnFamily.java       |  54 +++
 .../storage/flow/FlowRunColumnPrefix.java       | 239 +++++++++
 .../storage/flow/FlowRunCoprocessor.java        | 210 ++++++++
 .../storage/flow/FlowRunRowKey.java             |  50 ++
 .../storage/flow/FlowRunTable.java              | 141 ++++++
 .../storage/flow/FlowScanner.java               | 486 +++++++++++++++++++
 .../storage/TestHBaseTimelineStorage.java       |  28 +-
 .../storage/flow/TestFlowDataGenerator.java     | 213 ++++++++
 .../flow/TestHBaseStorageFlowActivity.java      | 372 ++++++++++++++
 .../storage/flow/TestHBaseStorageFlowRun.java   | 290 +++++++++++
 33 files changed, 3562 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0d43135..cc0f014 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -103,6 +103,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4102. Add a "skip existing table" mode for timeline schema creator (Li
     Lu via sjlee)
 
+    YARN-3901. Populate flow run data in the flow_run & flow activity tables
+    (Vrushali C via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index da7fadf..758feb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -174,6 +174,19 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <additionnalDependencies>
+            <additionnalDependency>
+              <groupId>junit</groupId>
+              <artifactId>junit</artifactId>
+              <version>4.11</version>
+            </additionnalDependency>
+          </additionnalDependencies>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 772002d..7c4a5da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -33,11 +33,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
 /**
- * This implements a hbase based backend for storing application timeline entity
+ * This implements a hbase based backend for storing the timeline entity
  * information.
+ * It writes to multiple tables at the backend
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class HBaseTimelineWriterImpl extends AbstractService implements
     TimelineWriter {
 
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineWriterImpl.class);
+
   private Connection conn;
   private TypedBufferedMutator<EntityTable> entityTable;
   private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
   private TypedBufferedMutator<ApplicationTable> applicationTable;
-
-  private static final Log LOG = LogFactory
-      .getLog(HBaseTimelineWriterImpl.class);
+  private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+  private TypedBufferedMutator<FlowRunTable> flowRunTable;
 
   public HBaseTimelineWriterImpl() {
     super(HBaseTimelineWriterImpl.class.getName());
@@ -91,6 +103,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
     appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
     applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
+    flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
+    flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -111,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       // if the entity is the application, the destination is the application
       // table
-      boolean isApplication = isApplicationEntity(te);
+      boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
       byte[] rowKey = isApplication ?
           ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
               appId) :
@@ -124,37 +138,139 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeMetrics(rowKey, te.getMetrics(), isApplication);
       storeRelations(rowKey, te, isApplication);
 
-      if (isApplicationCreated(te)) {
-        onApplicationCreated(
-            clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+      if (isApplication) {
+        if (TimelineWriterUtils.isApplicationCreated(te)) {
+          onApplicationCreated(clusterId, userId, flowName, flowVersion,
+              flowRunId, appId, te);
+        }
+        // if it's an application entity, store metrics
+        storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
+            appId, te);
+        // if application has finished, store it's finish time and write final
+        // values
+        // of all metrics
+        if (TimelineWriterUtils.isApplicationFinished(te)) {
+          onApplicationFinished(clusterId, userId, flowName, flowVersion,
+              flowRunId, appId, te);
+        }
       }
     }
     return putStatus;
   }
 
-  private static boolean isApplicationEntity(TimelineEntity te) {
-    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  private void onApplicationCreated(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    // store in App to flow table
+    storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId,
+        appId, te);
+    // store in flow run table
+    storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
+        flowRunId, appId, te);
+    // store in flow activity table
+    storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+        flowRunId, appId, te);
   }
 
-  private static boolean isApplicationCreated(TimelineEntity te) {
-    if (isApplicationEntity(te)) {
-      for (TimelineEvent event : te.getEvents()) {
-        if (event.getId().equals(
-            ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
-          return true;
-        }
-      }
-    }
-    return false;
+  /*
+   * updates the {@link FlowActivityTable} with the Application TimelineEntity
+   * information
+   */
+  private void storeInFlowActivityTable(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
+    byte[] qualifier = GenericObjectMapper.write(flowRunId);
+    FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+        null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
   }
 
-  private void onApplicationCreated(String clusterId, String userId,
+  /*
+   * updates the {@link FlowRunTable} with Application Created information
+   */
+  private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+        flowRunId);
+    FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
+        te.getCreatedTime(),
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  private void storeInAppToFlowTable(String clusterId, String userId,
       String flowName, String flowVersion, long flowRunId, String appId,
       TimelineEntity te) throws IOException {
     byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
     AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
-    AppToFlowColumn.FLOW_RUN_ID.store(
-        rowKey, appToFlowTable, null, flowRunId);
+    AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+  }
+
+  /*
+   * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+   * application has finished
+   */
+  private void onApplicationFinished(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    // store in flow run table
+    storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
+        appId, te);
+
+    // indicate in the flow activity table that the app has finished
+    storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+        flowRunId, appId, te);
+  }
+
+  /*
+   * Update the {@link FlowRunTable} with Application Finished information
+   */
+  private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
+      String flowName, long flowRunId, String appId, TimelineEntity te)
+      throws IOException {
+    byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+        flowRunId);
+    Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
+        .getAttribute(appId);
+    FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
+        TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+
+    // store the final value of metrics since application has finished
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      storeFlowMetrics(rowKey, metrics, attributeAppId,
+          AggregationOperation.SUM_FINAL.getAttribute());
+    }
+  }
+
+  /*
+   * Updates the {@link FlowRunTable} with Application Metrics
+   */
+  private void storeFlowMetricsAppRunning(String clusterId, String userId,
+      String flowName, long flowRunId, String appId, TimelineEntity te)
+      throws IOException {
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+          flowRunId);
+      storeFlowMetrics(rowKey, metrics,
+          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+    }
+  }
+
+  private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      Attribute... attributes) throws IOException {
+    for (TimelineMetric metric : metrics) {
+      String metricColumnQualifier = metric.getId();
+      Map<Long, Number> timeseries = metric.getValues();
+      for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+        Long timestamp = timeseriesEntry.getKey();
+        FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
+            metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+            attributes);
+      }
+    }
   }
 
   private void storeRelations(byte[] rowKey, TimelineEntity te,
@@ -184,7 +300,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       // id3?id4?id5
       String compoundValue =
           Separator.VALUES.joinEncoded(connectedEntity.getValue());
-
       columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
           compoundValue);
     }
@@ -342,6 +457,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     entityTable.flush();
     appToFlowTable.flush();
     applicationTable.flush();
+    flowRunTable.flush();
+    flowActivityTable.flush();
   }
 
   /**
@@ -364,6 +481,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       LOG.info("closing the application table");
       applicationTable.close();
     }
+    if (flowRunTable != null) {
+      LOG.info("closing the flow run table");
+      // The close API performs flushing and releases any resources held
+      flowRunTable.close();
+    }
+    if (flowActivityTable != null) {
+      LOG.info("closing the flowActivityTable table");
+      // The close API performs flushing and releases any resources held
+      flowActivityTable.close();
+    }
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index e7e51a7..cbcff4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
 /**
  * This creates the schema for a hbase based backend for storing application
@@ -199,7 +201,7 @@ public class TimelineSchemaCreator {
     return commandLine;
   }
 
-  private static void createAllTables(Configuration hbaseConf,
+  public static void createAllTables(Configuration hbaseConf,
       boolean skipExisting) throws IOException {
 
     Connection conn = null;
@@ -236,6 +238,24 @@ public class TimelineSchemaCreator {
           throw e;
         }
       }
+      try {
+        new FlowRunTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowActivityTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
     } finally {
       if (conn != null) {
         conn.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
index c028386..802626d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies fully qualified columns for the {@link ApplicationTable}.
@@ -76,9 +77,9 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
 
   public void store(byte[] rowKey,
       TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
-      Object inputValue) throws IOException {
+      Object inputValue, Attribute... attributes) throws IOException {
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
-        inputValue);
+        inputValue, attributes);
   }
 
   public Object readResult(Result result) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index ad1def6..d7b5773 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies partially qualified columns for the application table.
@@ -112,7 +113,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -123,8 +125,9 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
-  }
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+ }
 
   /*
    * (non-Javadoc)
@@ -137,7 +140,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute...attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -148,7 +152,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
index 423037a..859fdca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Identifies fully qualified columns for the {@link AppToFlowTable}.
@@ -67,9 +69,9 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> {
 
   public void store(byte[] rowKey,
       TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
-      Object inputValue) throws IOException {
+      Object inputValue, Attribute... attributes) throws IOException {
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
-        inputValue);
+        inputValue, attributes);
   }
 
   public Object readResult(Result result) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
index 3397d62..64c1cda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * A Column represents the way to store a fully qualified column in a specific
@@ -38,12 +39,15 @@ public interface Column<T> {
    *          column.
    * @param timestamp version timestamp. When null the server timestamp will be
    *          used.
+   * @param attributes Map of attributes for this mutation. used in the coprocessor
+   *          to set/read the cell tags. Can be null.
    * @param inputValue the value to write to the rowKey and column qualifier.
    *          Nothing gets written when null.
    * @throws IOException
    */
   public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
-      Long timestamp, Object inputValue) throws IOException;
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException;
 
   /**
    * Get the latest version of this specified column. Note: this call clones the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index f1b7c58..3a2e088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 /**
  * This class is meant to be used only by explicit Columns, and not directly to
  * write by clients.
@@ -58,31 +59,66 @@ public class ColumnHelper<T> {
    * Sends a Mutation to the table. The mutations will be buffered and sent over
    * the wire as part of a batch.
    *
-   * @param rowKey identifying the row to write. Nothing gets written when null.
-   * @param tableMutator used to modify the underlying HBase table
-   * @param columnQualifier column qualifier. Nothing gets written when null.
-   * @param timestamp version timestamp. When null the server timestamp will be
-   *          used.
-   * @param inputValue the value to write to the rowKey and column qualifier.
-   *          Nothing gets written when null.
+   * @param rowKey
+   *          identifying the row to write. Nothing gets written when null.
+   * @param tableMutator
+   *          used to modify the underlying HBase table
+   * @param columnQualifier
+   *          column qualifier. Nothing gets written when null.
+   * @param timestamp
+   *          version timestamp. When null the current timestamp multiplied with
+   *          TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+   *          app id will be used
+   * @param inputValue
+   *          the value to write to the rowKey and column qualifier. Nothing
+   *          gets written when null.
    * @throws IOException
    */
   public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
-      byte[] columnQualifier, Long timestamp, Object inputValue)
-      throws IOException {
+      byte[] columnQualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException {
     if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
       return;
     }
     Put p = new Put(rowKey);
+    timestamp = getPutTimestamp(timestamp, attributes);
+    p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+        GenericObjectMapper.write(inputValue));
+    if ((attributes != null) && (attributes.length > 0)) {
+      for (Attribute attribute : attributes) {
+        p.setAttribute(attribute.getName(), attribute.getValue());
+      }
+    }
+    tableMutator.mutate(p);
+  }
 
+  /*
+   * Figures out the cell timestamp used in the Put For storing into flow run
+   * table. We would like to left shift the timestamp and supplement it with the
+   * AppId id so that there are no collisions in the flow run table's cells
+   */
+  private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
     if (timestamp == null) {
-      p.addColumn(columnFamilyBytes, columnQualifier,
-          GenericObjectMapper.write(inputValue));
-    } else {
-      p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
-          GenericObjectMapper.write(inputValue));
+      timestamp = System.currentTimeMillis();
     }
-    tableMutator.mutate(p);
+    String appId = getAppIdFromAttributes(attributes);
+    long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+        timestamp, appId);
+    return supplementedTS;
+  }
+
+  private String getAppIdFromAttributes(Attribute[] attributes) {
+    if (attributes == null) {
+      return null;
+    }
+    String appId = null;
+    for (Attribute attribute : attributes) {
+      if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+          attribute.getName())) {
+        appId = Bytes.toString(attribute.getValue());
+      }
+    }
+    return appId;
   }
 
   /**
@@ -171,7 +207,9 @@ public class ColumnHelper<T> {
               for (Entry<Long, byte[]> cell : cells.entrySet()) {
                 V value =
                     (V) GenericObjectMapper.read(cell.getValue());
-                cellResults.put(cell.getKey(), value);
+                cellResults.put(
+                    TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
+                    value);
               }
             }
             results.put(columnName, cellResults);
@@ -315,6 +353,27 @@ public class ColumnHelper<T> {
   /**
    * @param columnPrefixBytes The byte representation for the column prefix.
    *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier for the remainder of the column.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      long qualifier) {
+
+    if (columnPrefixBytes == null) {
+      return Bytes.toBytes(qualifier);
+    }
+
+    // Convert qualifier to lower case, strip of separators and tag on column
+    // prefix.
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
+    return columnQualifier;
+  }
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
    * @param qualifier the byte representation for the remainder of the column.
    * @return fully sanitized column qualifier that is a combination of prefix
    *         and qualifier. If prefix is null, the result is simply the encoded

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 509ff49..db49098 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -23,6 +23,7 @@ import java.util.NavigableMap;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Used to represent a partially qualified column, where the actual column name
@@ -43,12 +44,36 @@ public interface ColumnPrefix<T> {
    * @param qualifier column qualifier. Nothing gets written when null.
    * @param timestamp version timestamp. When null the server timestamp will be
    *          used.
+   *@param attributes attributes for the mutation that are used by the coprocessor
+   *          to set/read the cell tags
    * @param inputValue the value to write to the rowKey and column qualifier.
    *          Nothing gets written when null.
    * @throws IOException
    */
   public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
-      String qualifier, Long timestamp, Object inputValue) throws IOException;
+      byte[] qualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException;
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   *@param attributes attributes for the mutation that are used by the coprocessor
+   *          to set/read the cell tags
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      String qualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException;
 
   /**
    * Get the latest version of this specified column. Note: this call clones the
@@ -81,4 +106,5 @@ public interface ColumnPrefix<T> {
    */
   public <V> NavigableMap<String, NavigableMap<Long, V>>
       readResultsWithTimestamps(Result result) throws IOException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
index 58bdedc7e..371371a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -19,9 +19,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * bunch of utility functions used across TimelineWriter classes
@@ -36,6 +46,9 @@ public class TimelineWriterUtils {
   /** indicator for no limits for splitting */
   public static final int NO_LIMIT_SPLIT = -1;
 
+  /** milliseconds in one day */
+  public static final long MILLIS_ONE_DAY = 86400000L;
+
   /**
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
@@ -140,4 +153,176 @@ public class TimelineWriterUtils {
     return Long.MAX_VALUE - key;
   }
 
+  /**
+   * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+   * for a given input timestamp
+   *
+   * @param ts
+   * @return timestamp of that day's beginning (midnight)
+   */
+  public static long getTopOfTheDayTimestamp(long ts) {
+    long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+    return dayTimestamp;
+  }
+
+  /**
+   * Combines the input array of attributes and the input aggregation operation
+   * into a new array of attributes.
+   *
+   * @param attributes
+   * @param aggOp
+   * @return array of combined attributes
+   */
+  public static Attribute[] combineAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+    Attribute[] combinedAttributes = new Attribute[newLength];
+
+    if (attributes != null) {
+      System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+    }
+
+    if (aggOp != null) {
+      Attribute a2 = aggOp.getAttribute();
+      combinedAttributes[newLength - 1] = a2;
+    }
+    return combinedAttributes;
+  }
+
+  /**
+   * Returns a number for the new array size. The new array is the combination
+   * of input array of attributes and the input aggregation operation.
+   *
+   * @param attributes
+   * @param aggOp
+   * @return the size for the new array
+   */
+  private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int oldLength = getAttributesLength(attributes);
+    int aggLength = getAppOpLength(aggOp);
+    return oldLength + aggLength;
+  }
+
+  private static int getAppOpLength(AggregationOperation aggOp) {
+    if (aggOp != null) {
+      return 1;
+    }
+    return 0;
+  }
+
+  private static int getAttributesLength(Attribute[] attributes) {
+    if (attributes != null) {
+      return attributes.length;
+    }
+    return 0;
+  }
+
+  /**
+   * checks if an application has finished
+   *
+   * @param te
+   * @return true if application has finished else false
+   */
+  public static boolean isApplicationFinished(TimelineEntity te) {
+    SortedSet<TimelineEvent> allEvents = te.getEvents();
+    if ((allEvents != null) && (allEvents.size() > 0)) {
+      TimelineEvent event = allEvents.last();
+      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * get the time at which an app finished
+   *
+   * @param te
+   * @return true if application has finished else false
+   */
+  public static long getApplicationFinishedTime(TimelineEntity te) {
+    SortedSet<TimelineEvent> allEvents = te.getEvents();
+    if ((allEvents != null) && (allEvents.size() > 0)) {
+      TimelineEvent event = allEvents.last();
+      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+        return event.getTimestamp();
+      }
+    }
+    return 0l;
+  }
+
+  /**
+   * Checks if the input TimelineEntity object is an ApplicationEntity.
+   *
+   * @param te
+   * @return true if input is an ApplicationEntity, false otherwise
+   */
+  public static boolean isApplicationEntity(TimelineEntity te) {
+    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  }
+
+  /**
+   * Checks for the APPLICATION_CREATED event.
+   *
+   * @param te
+   * @return true is application event exists, false otherwise
+   */
+  public static boolean isApplicationCreated(TimelineEntity te) {
+    if (isApplicationEntity(te)) {
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId()
+            .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns the first seen aggregation operation as seen in the list of input
+   * tags or null otherwise
+   *
+   * @param tags
+   * @return AggregationOperation
+   */
+  public static AggregationOperation getAggregationOperationFromTagsList(
+      List<Tag> tags) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      for (Tag tag : tags) {
+        if (tag.getType() == aggOp.getTagType()) {
+          return aggOp;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a {@link Tag} from the input attribute.
+   *
+   * @param attribute
+   * @return Tag
+   */
+  public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
+    // attribute could be either an Aggregation Operation or
+    // an Aggregation Dimension
+    // Get the Tag type from either
+    AggregationOperation aggOp = AggregationOperation
+        .getAggregationOperation(attribute.getKey());
+    if (aggOp != null) {
+      Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+      return t;
+    }
+
+    AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
+        .getAggregationCompactionDimension(attribute.getKey());
+    if (aggCompactDim != null) {
+      Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+      return t;
+    }
+    return null;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
new file mode 100644
index 0000000..555b64e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * Utility class that allows HBase coprocessors to interact with unique
+ * timestamps.
+ */
+public class TimestampGenerator {
+
+  /*
+   * if this is changed, then reading cell timestamps written with older
+   * multiplier value will not work
+   */
+  public static final long TS_MULTIPLIER = 1000L;
+
+  private final AtomicLong lastTimestamp = new AtomicLong();
+
+  /**
+   * Returns the current wall clock time in milliseconds, multiplied by the
+   * required precision.
+   */
+  public long currentTime() {
+    // We want to align cell timestamps with current time.
+    // cell timestamps are not be less than
+    // System.currentTimeMillis() * TS_MULTIPLIER.
+    return System.currentTimeMillis() * TS_MULTIPLIER;
+  }
+
+  /**
+   * Returns a timestamp value unique within the scope of this
+   * {@code TimestampGenerator} instance. For usage by HBase
+   * {@code RegionObserver} coprocessors, this normally means unique within a
+   * given region.
+   *
+   * Unlikely scenario of generating a non-unique timestamp: if there is a
+   * sustained rate of more than 1M hbase writes per second AND if region fails
+   * over within that time range of timestamps being generated then there may be
+   * collisions writing to a cell version of the same column.
+   */
+  public long getUniqueTimestamp() {
+    long lastTs;
+    long nextTs;
+    do {
+      lastTs = lastTimestamp.get();
+      nextTs = Math.max(lastTs + 1, currentTime());
+    } while (!lastTimestamp.compareAndSet(lastTs, nextTs));
+    return nextTs;
+  }
+
+  /**
+   * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   * application id
+   *
+   * Unlikely scenario of generating a timestamp that is a duplicate: If more
+   * than a 1000 concurrent apps are running in one flow run AND write to same
+   * column at the same time, then say appId of 1001 will overlap with appId of
+   * 001 and there may be collisions for that flow run's specific column.
+   *
+   * @param incomingTS
+   * @param appId
+   * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   *         application id
+   */
+  public static long getSupplementedTimestamp(long incomingTS, String appId) {
+    long suffix = getAppIdSuffix(appId);
+    long outgoingTS = incomingTS * TS_MULTIPLIER + suffix;
+    return outgoingTS;
+
+  }
+
+  private static long getAppIdSuffix(String appIdStr) {
+    if (appIdStr == null) {
+      return 0L;
+    }
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    long id = appId.getId() % TS_MULTIPLIER;
+    return id;
+  }
+
+  /**
+   * truncates the last few digits of the timestamp which were supplemented by
+   * the TimestampGenerator#getSupplementedTimestamp function
+   *
+   * @param incomingTS
+   * @return a truncated timestamp value
+   */
+  public static long getTruncatedTimestamp(long incomingTS) {
+    return incomingTS / TS_MULTIPLIER;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
deleted file mode 100644
index 32577fb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 26e7748..8ae19b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies fully qualified columns for the {@link EntityTable}.
@@ -81,9 +83,9 @@ public enum EntityColumn implements Column<EntityTable> {
 
   public void store(byte[] rowKey,
       TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
-      Object inputValue) throws IOException {
+      Object inputValue, Attribute... attributes) throws IOException {
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
-        inputValue);
+        inputValue, attributes);
   }
 
   public Object readResult(Result result) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 75ff742..0d4e5a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies partially qualified columns for the entity table.
@@ -108,11 +109,13 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
    * #store(byte[],
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.
-   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -123,8 +126,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
-  }
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+ }
 
   /*
    * (non-Javadoc)
@@ -137,7 +141,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -148,8 +153,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
-  }
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+ }
 
   /*
    * (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
new file mode 100644
index 0000000..ff12c7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the compaction dimensions for the data in the {@link FlowRunTable}
+ * .
+ */
+public enum AggregationCompactionDimension {
+
+  /**
+   * the application id
+   */
+  APPLICATION_ID((byte) 101);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationCompactionDimension(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute(String attributeValue) {
+    return new Attribute(this.name(), Bytes.toBytes(attributeValue));
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  public static AggregationCompactionDimension getAggregationCompactionDimension(
+      String aggCompactDimStr) {
+    for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
+        .values()) {
+      if (aggDim.name().equals(aggCompactDimStr)) {
+        return aggDim;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
new file mode 100644
index 0000000..c635ce6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
+ * The numbers used for tagType are prime numbers
+ */
+public enum AggregationOperation {
+
+  /**
+   * When the flow was started.
+   */
+  MIN((byte) 71),
+
+  /**
+   * When it ended.
+   */
+  MAX((byte) 73),
+
+  /**
+   * The metrics of the flow
+   */
+  SUM((byte) 79),
+
+  /**
+   * application running
+   */
+  SUM_FINAL((byte) 83),
+
+  /**
+   * compact
+   */
+  COMPACT((byte) 89);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationOperation(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute() {
+    return new Attribute(this.name(), this.inBytes);
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  /**
+   * returns the AggregationOperation enum that represents that string
+   * @param aggOpStr
+   * @return the AggregationOperation enum that represents that string
+   */
+  public static AggregationOperation getAggregationOperation(String aggOpStr) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      if (aggOp.name().equals(aggOpStr)) {
+        return aggOp;
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
new file mode 100644
index 0000000..d3de518
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+/**
+ * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}.
+ */
+public class Attribute {
+  private final String name;
+  private final byte[] value;
+
+  public Attribute(String name, byte[] value) {
+    this.name = name;
+    this.value = value.clone();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public byte[] getValue() {
+    return value.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
new file mode 100644
index 0000000..d991b42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value
+   *          create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private FlowActivityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
new file mode 100644
index 0000000..b899e5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowActivityTable}
+ */
+public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> {
+
+  /**
+   * To store run ids of the flows
+   */
+  RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
+
+  private final ColumnHelper<FlowActivityTable> column;
+  private final ColumnFamily<FlowActivityTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private final AggregationOperation aggOp;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily
+   *          that this column is stored in.
+   * @param columnPrefix
+   *          for this column.
+   */
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp) {
+    column = new ColumnHelper<FlowActivityTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+          .encode(columnPrefix));
+    }
+    this.aggOp = aggOp;
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  public byte[] getColumnPrefixBytes() {
+    return columnPrefixBytes.clone();
+  }
+
+  public AggregationOperation getAttribute() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public Map<String, Object> readResults(Result result) throws IOException {
+    return column.readResults(result, columnPrefixBytes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   */
+  public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+      Result result) throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+   * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+   * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null
+   */
+  public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix
+        .values()) {
+      // Find a match based only on name.
+      if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
+        return flowActivityColPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+   * is no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final FlowActivityColumnPrefix columnFor(
+      FlowActivityColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix
+        .values()) {
+      // Find a match based column family and on name.
+      if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (flowActivityColumnPrefix
+              .getColumnPrefix() == null)) || (flowActivityColumnPrefix
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return flowActivityColumnPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
+        combinedAttributes);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
new file mode 100644
index 0000000..19e4e83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow activity table.
+ */
+public class FlowActivityRowKey {
+
+  private final String clusterId;
+  private final long dayTs;
+  private final String userId;
+  private final String flowId;
+
+  public FlowActivityRowKey(String clusterId, long dayTs, String userId,
+      String flowId) {
+    this.clusterId = clusterId;
+    this.dayTs = dayTs;
+    this.userId = userId;
+    this.flowId = flowId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public long getDayTimestamp() {
+    return dayTs;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  /**
+   * Constructs a row key for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!user!flowId}
+   *
+   * Will insert into current day's record in the table
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+        .currentTimeMillis());
+    return getRowKey(clusterId, dayTs, userId, flowId);
+  }
+
+  /**
+   * Constructs a row key for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!user!flowId}
+   *
+   * @param clusterId
+   * @param dayTs
+   * @param userId
+   * @param flowId
+   * @return byte array for the row key
+   */
+  public static byte[] getRowKey(String clusterId, long dayTs, String userId,
+      String flowId) {
+    return Separator.QUALIFIERS.join(
+        Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
+        Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+        Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
+        Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
+  }
+
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 4) {
+      throw new IllegalArgumentException("the row key is not valid for "
+          + "a flow activity");
+    }
+
+    String clusterId = Separator.QUALIFIERS.decode(Bytes
+        .toString(rowKeyComponents[0]));
+    long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+    String userId = Separator.QUALIFIERS.decode(Bytes
+        .toString(rowKeyComponents[2]));
+    String flowId = Separator.QUALIFIERS.decode(Bytes
+        .toString(rowKeyComponents[3]));
+    return new FlowActivityRowKey(clusterId, dayTs, userId, flowId);
+  }
+}