You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2015/09/17 19:51:38 UTC

[1/3] hadoop git commit: YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 b1960e0d2 -> 4b37985e6


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
new file mode 100644
index 0000000..b4a0c74
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowActivity {
+
+  private static HBaseTestingUtility util;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  /**
+   * Writes 4 timeline entities belonging to one flow run through the
+   * {@link HBaseTimelineWriterImpl}
+   *
+   * Checks the flow run table contents
+   *
+   * The first entity has a created event, metrics and a finish event.
+   *
+   * The second entity has a created event and this is the entity with smallest
+   * start time. This should be the start time for the flow run.
+   *
+   * The third entity has a finish event and this is the entity with the max end
+   * time. This should be the end time for the flow run.
+   *
+   * The fourth entity has a created event which has a start time that is
+   * greater than min start time.
+   *
+   * The test also checks in the flow activity table that one entry has been
+   * made for all of these 4 application entities since they belong to the same
+   * flow run.
+   */
+  @Test
+  public void testWriteFlowRunMinMax() throws Exception {
+
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(TestFlowDataGenerator.getEntity1());
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+    String user = "testWriteFlowRunMinMaxToHBase_user1";
+    String flow = "testing_flowRun_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    String appName = "application_100000000000_1111";
+    long endTs = 1439750690000L;
+    TimelineEntity entityMinStartTime = TestFlowDataGenerator
+        .getEntityMinStartTime();
+
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // write another entity with the right min start time
+      te = new TimelineEntities();
+      te.addEntity(entityMinStartTime);
+      appName = "application_100000000000_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity for max end time
+      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+          .getEntityMaxEndTime(endTs);
+      te = new TimelineEntities();
+      te.addEntity(entityMaxEndTime);
+      appName = "application_100000000000_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity with greater start time
+      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+          .getEntityGreaterStartTime();
+      te = new TimelineEntities();
+      te.addEntity(entityGreaterStartTime);
+      appName = "application_1000000000000000_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // flush everything to hbase
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow activity table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    Get g = new Get(startRow);
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
+        .getBytes());
+    assertEquals(1, values.size());
+    byte[] row = r1.getRow();
+    FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row);
+    assertNotNull(flowActivityRowKey);
+    assertEquals(cluster, flowActivityRowKey.getClusterId());
+    assertEquals(user, flowActivityRowKey.getUserId());
+    assertEquals(flow, flowActivityRowKey.getFlowId());
+    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+        .currentTimeMillis());
+    assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+    assertEquals(1, values.size());
+    checkFlowActivityRunId(runid, flowVersion, values);
+  }
+
+  /**
+   * Write 1 application entity and checks the record for today in the flow
+   * activity table
+   */
+  @Test
+  public void testWriteFlowActivityOneFlow() throws Exception {
+    String cluster = "testWriteFlowActivityOneFlow_cluster1";
+    String user = "testWriteFlowActivityOneFlow_user1";
+    String flow = "flow_activity_test_flow_name";
+    String flowVersion = "A122110F135BC4";
+    Long runid = 1001111178919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_1111999999_1234";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+    // check flow activity
+    checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+  }
+
+  private void checkFlowActivityTable(String cluster, String user, String flow,
+      String flowVersion, Long runid, Configuration c1) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+      rowCount++;
+      byte[] row = result.getRow();
+      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+          .parseRowKey(row);
+      assertNotNull(flowActivityRowKey);
+      assertEquals(cluster, flowActivityRowKey.getClusterId());
+      assertEquals(user, flowActivityRowKey.getUserId());
+      assertEquals(flow, flowActivityRowKey.getFlowId());
+      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+          .currentTimeMillis());
+      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+      assertEquals(1, values.size());
+      checkFlowActivityRunId(runid, flowVersion, values);
+    }
+    assertEquals(1, rowCount);
+  }
+
+  /**
+   * Writes 3 applications each with a different run id and version for the same
+   * {cluster, user, flow}
+   *
+   * They should be getting inserted into one record in the flow activity table
+   * with 3 columns, one per run id
+   */
+  @Test
+  public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
+    String cluster = "testManyRunsFlowActivity_cluster1";
+    String user = "testManyRunsFlowActivity_c_user1";
+    String flow = "flow_activity_test_flow_name";
+    String flowVersion1 = "A122110F135BC4";
+    Long runid1 = 11111111111L;
+
+    String flowVersion2 = "A12222222222C4";
+    long runid2 = 2222222222222L;
+
+    String flowVersion3 = "A1333333333C4";
+    long runid3 = 3333333333333L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11888888888_1111";
+      hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
+
+      // write an application with to this flow but a different runid/ version
+      te = new TimelineEntities();
+      te.addEntity(entityApp1);
+      appName = "application_11888888888_2222";
+      hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
+
+      // write an application with to this flow but a different runid/ version
+      te = new TimelineEntities();
+      te.addEntity(entityApp1);
+      appName = "application_11888888888_3333";
+      hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
+
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+    // check flow activity
+    checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
+        runid1, flowVersion2, runid2, flowVersion3, runid3);
+
+  }
+
+  private void checkFlowActivityTableSeveralRuns(String cluster, String user,
+      String flow, Configuration c1, String flowVersion1, Long runid1,
+      String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+      throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      byte[] row = result.getRow();
+      FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+          .parseRowKey(row);
+      assertNotNull(flowActivityRowKey);
+      assertEquals(cluster, flowActivityRowKey.getClusterId());
+      assertEquals(user, flowActivityRowKey.getUserId());
+      assertEquals(flow, flowActivityRowKey.getFlowId());
+      long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+          .currentTimeMillis());
+      assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+
+      Map<byte[], byte[]> values = result
+          .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+      rowCount++;
+      assertEquals(3, values.size());
+      checkFlowActivityRunId(runid1, flowVersion1, values);
+      checkFlowActivityRunId(runid2, flowVersion2, values);
+      checkFlowActivityRunId(runid3, flowVersion3, values);
+    }
+    // the flow activity table is such that it will insert
+    // into current day's record
+    // hence, if this test runs across the midnight boundary,
+    // it may fail since it would insert into two records
+    // one for each day
+    assertEquals(1, rowCount);
+  }
+
+  private void checkFlowActivityRunId(Long runid, String flowVersion,
+      Map<byte[], byte[]> values) throws IOException {
+    byte[] rq = ColumnHelper.getColumnQualifier(
+        FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
+        GenericObjectMapper.write(runid));
+    for (Map.Entry<byte[], byte[]> k : values.entrySet()) {
+      String actualQ = Bytes.toString(k.getKey());
+      if (Bytes.toString(rq).equals(actualQ)) {
+        String actualV = (String) GenericObjectMapper.read(k.getValue());
+        assertEquals(flowVersion, actualV);
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
new file mode 100644
index 0000000..bf524ea
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRun {
+
+  private static HBaseTestingUtility util;
+
+  private final String metric1 = "MAP_SLOT_MILLIS";
+  private final String metric2 = "HDFS_BYTES_READ";
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    util = new HBaseTestingUtility();
+    Configuration conf = util.getConfiguration();
+    conf.setInt("hfile.format.version", 3);
+    util.startMiniCluster();
+    createSchema();
+  }
+
+  private static void createSchema() throws IOException {
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+  }
+
+  /**
+   * Writes 4 timeline entities belonging to one flow run through the
+   * {@link HBaseTimelineWriterImpl}
+   *
+   * Checks the flow run table contents
+   *
+   * The first entity has a created event, metrics and a finish event.
+   *
+   * The second entity has a created event and this is the entity with smallest
+   * start time. This should be the start time for the flow run.
+   *
+   * The third entity has a finish event and this is the entity with the max end
+   * time. This should be the end time for the flow run.
+   *
+   * The fourth entity has a created event which has a start time that is
+   * greater than min start time.
+   *
+   */
+  @Test
+  public void testWriteFlowRunMinMax() throws Exception {
+
+    TimelineEntities te = new TimelineEntities();
+    te.addEntity(TestFlowDataGenerator.getEntity1());
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+    String user = "testWriteFlowRunMinMaxToHBase_user1";
+    String flow = "testing_flowRun_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+    String appName = "application_100000000000_1111";
+    long endTs = 1439750690000L;
+    TimelineEntity entityMinStartTime = TestFlowDataGenerator
+        .getEntityMinStartTime();
+
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // write another entity with the right min start time
+      te = new TimelineEntities();
+      te.addEntity(entityMinStartTime);
+      appName = "application_100000000000_3333";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity for max end time
+      TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+          .getEntityMaxEndTime(endTs);
+      te = new TimelineEntities();
+      te.addEntity(entityMaxEndTime);
+      appName = "application_100000000000_4444";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // writer another entity with greater start time
+      TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+          .getEntityGreaterStartTime();
+      te = new TimelineEntities();
+      te.addEntity(entityGreaterStartTime);
+      appName = "application_1000000000000000_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+      // flush everything to hbase
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    Connection conn = ConnectionFactory.createConnection(c1);
+    // check in flow run table
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    // scan the table and see that we get back the right min and max
+    // timestamps
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    Get g = new Get(startRow);
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+    g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+        FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+    Result r1 = table1.get(g);
+    assertNotNull(r1);
+    assertTrue(!r1.isEmpty());
+    Map<byte[], byte[]> values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+        .getBytes());
+
+    assertEquals(2, r1.size());
+    Long starttime = (Long) GenericObjectMapper.read(values
+        .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+    Long expmin = entityMinStartTime.getCreatedTime();
+    assertEquals(expmin, starttime);
+    assertEquals(endTs, GenericObjectMapper.read(values
+        .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+  }
+
+  boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
+      String flow, Long runid) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+    assertTrue(rowKeyComponents.length == 4);
+    assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+    assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+    assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+    assertEquals(TimelineWriterUtils.invert(runid),
+        Bytes.toLong(rowKeyComponents[3]));
+    return true;
+  }
+
+  /**
+   * Writes two application entities of the same flow run. Each application has
+   * two metrics: slot millis and hdfs bytes read. Each metric has values at two
+   * timestamps.
+   *
+   * Checks the metric values of the flow in the flow run table. Flow metric
+   * values should be the sum of individual metric values that belong to the
+   * latest timestamp for that metric
+   */
+  @Test
+  public void testWriteFlowRunMetricsOneFlow() throws Exception {
+    String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+    String user = "testWriteFlowRunMetricsOneFlow_user1";
+    String flow = "testing_flowRun_metrics_flow_name";
+    String flowVersion = "CF7022C10F1354";
+    Long runid = 1002345678919L;
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+    te.addEntity(entityApp1);
+
+    HBaseTimelineWriterImpl hbi = null;
+    Configuration c1 = util.getConfiguration();
+    try {
+      hbi = new HBaseTimelineWriterImpl(c1);
+      hbi.init(c1);
+      String appName = "application_11111111111111_1111";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      // write another application with same metric to this flow
+      te = new TimelineEntities();
+      TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+      te.addEntity(entityApp2);
+      appName = "application_11111111111111_2222";
+      hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+      hbi.flush();
+    } finally {
+      hbi.close();
+    }
+
+    // check flow run
+    checkFlowRunTable(cluster, user, flow, runid, c1);
+  }
+
+  private void checkFlowRunTable(String cluster, String user, String flow,
+      long runid, Configuration c1) throws IOException {
+    Scan s = new Scan();
+    s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    s.setStartRow(startRow);
+    String clusterStop = cluster + "1";
+    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+    s.setStopRow(stopRow);
+    Connection conn = ConnectionFactory.createConnection(c1);
+    Table table1 = conn.getTable(TableName
+        .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+    ResultScanner scanner = table1.getScanner(s);
+
+    int rowCount = 0;
+    for (Result result : scanner) {
+      assertNotNull(result);
+      assertTrue(!result.isEmpty());
+      Map<byte[], byte[]> values = result.getFamilyMap(FlowRunColumnFamily.INFO
+          .getBytes());
+      rowCount++;
+      // check metric1
+      byte[] q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+      assertTrue(values.containsKey(q));
+      assertEquals(141, GenericObjectMapper.read(values.get(q)));
+
+      // check metric2
+      assertEquals(2, values.size());
+      q = ColumnHelper.getColumnQualifier(
+          FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+      assertTrue(values.containsKey(q));
+      assertEquals(57, GenericObjectMapper.read(values.get(q)));
+    }
+    assertEquals(1, rowCount);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+}


[2/3] hadoop git commit: YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)

Posted by sj...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
new file mode 100644
index 0000000..af8df99
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow activity table has column family info
+ * Stores the daily activity record for flows
+ * Useful as a quick lookup of what flows were
+ * running on a given day
+ *
+ * Example flow activity table record:
+ *
+ * </pre>
+ * |-------------------------------------------|
+ * |  Row key   | Column Family                |
+ * |            | info                         |
+ * |-------------------------------------------|
+ * | clusterId! | r!runid1:version1            |
+ * | inv Top of |                              |
+ * | Day!       | r!runid2:version7            |
+ * | userName!  |                              |
+ * | flowId     |                              |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowActivityTable extends BaseTable<FlowActivityTable> {
+  /** flow activity table prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity";
+
+  /** config param name that specifies the flowactivity table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for flowactivity table name */
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity";
+
+  private static final Log LOG = LogFactory.getLog(FlowActivityTable.class);
+
+  /** default max number of versions */
+  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+  public FlowActivityTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    FlowActivityTableDescp.addFamily(infoCF);
+    infoCF.setMinVersions(1);
+    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+    // TODO: figure the split policy before running in production
+    admin.createTable(FlowActivityTableDescp);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
new file mode 100644
index 0000000..ad30add
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies fully qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumn implements Column<FlowRunTable> {
+
+  /**
+   * When the flow was started. This is the minimum of currently known
+   * application start times.
+   */
+  MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
+      AggregationOperation.MIN),
+
+  /**
+   * When the flow ended. This is the maximum of currently known application end
+   * times.
+   */
+  MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
+      AggregationOperation.MAX),
+
+  /**
+   * The version of the flow that this flow belongs to.
+   */
+  FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null);
+
+  private final ColumnHelper<FlowRunTable> column;
+  private final ColumnFamily<FlowRunTable> columnFamily;
+  private final String columnQualifier;
+  private final byte[] columnQualifierBytes;
+  private final AggregationOperation aggOp;
+
+  private FlowRunColumn(ColumnFamily<FlowRunTable> columnFamily,
+      String columnQualifier, AggregationOperation aggOp) {
+    this.columnFamily = columnFamily;
+    this.columnQualifier = columnQualifier;
+    this.aggOp = aggOp;
+    // Future-proof by ensuring the right column prefix hygiene.
+    this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE
+        .encode(columnQualifier));
+    this.column = new ColumnHelper<FlowRunTable>(columnFamily);
+  }
+
+  /**
+   * @return the column name value
+   */
+  private String getColumnQualifier() {
+    return columnQualifier;
+  }
+
+  public byte[] getColumnQualifierBytes() {
+    return columnQualifierBytes.clone();
+  }
+
+  public AggregationOperation getAggregationOperation() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store
+   * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
+      Object inputValue, Attribute... attributes) throws IOException {
+
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, aggOp);
+    column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
+        inputValue, combinedAttributes);
+  }
+
+  public Object readResult(Result result) throws IOException {
+    return column.readResult(result, columnQualifierBytes);
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnQualifier
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumn} or null
+   */
+  public static final FlowRunColumn columnFor(String columnQualifier) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowRunColumn ec : FlowRunColumn.values()) {
+      // Find a match based only on name.
+      if (ec.getColumnQualifier().equals(columnQualifier)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumn} given a name, or null if there is no
+   * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)}
+   * if and only if {@code a.equals(b) & x.equals(y)} or
+   * {@code (x == y == null)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param name
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumn} or null if both arguments
+   *         don't match.
+   */
+  public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily,
+      String name) {
+
+    for (FlowRunColumn ec : FlowRunColumn.values()) {
+      // Find a match based column family and on name.
+      if (ec.columnFamily.equals(columnFamily)
+          && ec.getColumnQualifier().equals(name)) {
+        return ec;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
new file mode 100644
index 0000000..8faf5f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowRunColumnFamily implements ColumnFamily<FlowRunTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value
+   *          create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private FlowRunColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
new file mode 100644
index 0000000..d55f510
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowRunTable}.
+ */
+public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
+
+  /**
+   * To store flow run info values.
+   */
+  METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM);
+
+  private final ColumnHelper<FlowRunTable> column;
+  private final ColumnFamily<FlowRunTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private final AggregationOperation aggOp;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily
+   *          that this column is stored in.
+   * @param columnPrefix
+   *          for this column.
+   */
+  private FlowRunColumnPrefix(ColumnFamily<FlowRunTable> columnFamily,
+      String columnPrefix, AggregationOperation fra) {
+    column = new ColumnHelper<FlowRunTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+          .encode(columnPrefix));
+    }
+    this.aggOp = fra;
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  public byte[] getColumnPrefixBytes() {
+    return columnPrefixBytes.clone();
+  }
+
+  public AggregationOperation getAttribute() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowRunTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   */
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowRunTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public Map<String, Object> readResults(Result result) throws IOException {
+    return column.readResults(result, columnPrefixBytes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   */
+  public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+      Result result) throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+   * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if
+   * and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumnPrefix} or null
+   */
+  public static final FlowRunColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+      // Find a match based only on name.
+      if (frcp.getColumnPrefix().equals(columnPrefix)) {
+        return frcp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is
+   * no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowRunColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final FlowRunColumnPrefix columnFor(
+      FlowRunColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) {
+      // Find a match based column family and on name.
+      if (frcp.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return frcp;
+      }
+    }
+
+    // Default to null
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
new file mode 100644
index 0000000..f743e5e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+public class FlowRunCoprocessor extends BaseRegionObserver {
+
+  @SuppressWarnings("unused")
+  private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
+
+  private HRegion region;
+  /**
+   * generate a timestamp that is unique per row in a region this is per region
+   */
+  private final TimestampGenerator timestampGenerator = new TimestampGenerator();
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    if (e instanceof RegionCoprocessorEnvironment) {
+      RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+      this.region = env.getRegion();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * This method adds the tags onto the cells in the Put. It is presumed that
+   * all the cells in one Put have the same set of Tags. The existing cell
+   * timestamp is overwritten for non-metric cells and each such cell gets a new
+   * unique timestamp generated by {@link TimestampGenerator}
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
+   * .hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Put,
+   * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
+   * org.apache.hadoop.hbase.client.Durability)
+   */
+  @Override
+  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put,
+      WALEdit edit, Durability durability) throws IOException {
+    Map<String, byte[]> attributes = put.getAttributesMap();
+
+    // Assumption is that all the cells in a put are the same operation.
+    List<Tag> tags = new ArrayList<>();
+    if ((attributes != null) && (attributes.size() > 0)) {
+      for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
+        Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+        tags.add(t);
+      }
+      byte[] tagByteArray = Tag.fromList(tags);
+      NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>(
+          Bytes.BYTES_COMPARATOR);
+      for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap()
+          .entrySet()) {
+        List<Cell> newCells = new ArrayList<>(entry.getValue().size());
+        for (Cell cell : entry.getValue()) {
+          // for each cell in the put add the tags
+          // Assumption is that all the cells in
+          // one put are the same operation
+          // also, get a unique cell timestamp for non-metric cells
+          // this way we don't inadvertently overwrite cell versions
+          long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags);
+          newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell),
+              CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell),
+              cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell),
+              tagByteArray));
+        }
+        newFamilyMap.put(entry.getKey(), newCells);
+      } // for each entry
+      // Update the family map for the Put
+      put.setFamilyCellMap(newFamilyMap);
+    }
+  }
+
+  /**
+   * Determines if the current cell's timestamp is to be used or a new unique
+   * cell timestamp is to be used. The reason this is done is to inadvertently
+   * overwrite cells when writes come in very fast. But for metric cells, the
+   * cell timestamp signifies the metric timestamp. Hence we don't want to
+   * overwrite it.
+   *
+   * @param timestamp
+   * @param tags
+   * @return cell timestamp
+   */
+  private long getCellTimestamp(long timestamp, List<Tag> tags) {
+    // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default)
+    // then use the generator
+    if (timestamp == HConstants.LATEST_TIMESTAMP) {
+      return timestampGenerator.getUniqueTimestamp();
+    } else {
+      return timestamp;
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Creates a {@link FlowScanner} Scan so that it can correctly process the
+   * contents of {@link FlowRunTable}.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache
+   * .hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Get, java.util.List)
+   */
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
+      Get get, List<Cell> results) throws IOException {
+    Scan scan = new Scan(get);
+    scan.setMaxVersions();
+    RegionScanner scanner = null;
+    try {
+      scanner = new FlowScanner(region, scan.getBatch(),
+          region.getScanner(scan));
+      scanner.next(results);
+      e.bypass();
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Ensures that max versions are set for the Scan so that metrics can be
+   * correctly aggregated and min/max can be correctly determined.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
+   * .apache.hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Scan,
+   * org.apache.hadoop.hbase.regionserver.RegionScanner)
+   */
+  @Override
+  public RegionScanner preScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+      RegionScanner s) throws IOException {
+    // set max versions for scan to see all
+    // versions to aggregate for metrics
+    scan.setMaxVersions();
+    return s;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * Creates a {@link FlowScanner} Scan so that it can correctly process the
+   * contents of {@link FlowRunTable}.
+   *
+   * @see
+   * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen(
+   * org.apache.hadoop.hbase.coprocessor.ObserverContext,
+   * org.apache.hadoop.hbase.client.Scan,
+   * org.apache.hadoop.hbase.regionserver.RegionScanner)
+   */
+  @Override
+  public RegionScanner postScannerOpen(
+      ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
+      RegionScanner scanner) throws IOException {
+    return new FlowScanner(region, scan.getBatch(), scanner);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
new file mode 100644
index 0000000..e133241
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow run table.
+ */
+public class FlowRunRowKey {
+  // TODO: more methods are needed for this class like parse row key
+
+  /**
+   * Constructs a row key for the entity table as follows: {
+   * clusterId!userI!flowId!Inverted Flow Run Id}
+   *
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @param flowRunId
+   * @return byte array with the row key
+   */
+  public static byte[] getRowKey(String clusterId, String userId,
+      String flowId, Long flowRunId) {
+    byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
+        userId, flowId));
+    // Note that flowRunId is a long, so we can't encode them all at the same
+    // time.
+    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    return Separator.QUALIFIERS.join(first, second);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
new file mode 100644
index 0000000..b1b93c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow run table has column family info
+ * Stores per flow run information
+ * aggregated across applications.
+ *
+ * Metrics are also stored in the info column family.
+ *
+ * Example flow run table record:
+ *
+ * <pre>
+ * flow_run table
+ * |-------------------------------------------|
+ * |  Row key   | Column Family                |
+ * |            | info                         |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7        |
+ * | userName!  |                              |
+ * | flowId!    | running_apps:1               |
+ * | flowRunId  |                              |
+ * |            | min_start_time:1392995080000 |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | min_start_time:1392995081012 |
+ * |            | #0:appId2                    |
+ * |            |                              |
+ * |            | min_start_time:1392993083210 |
+ * |            | #0:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | max_end_time:1392993084018   |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapInputRecords:127        |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapInputRecords:31         |
+ * |            | #2:appId2                    |
+ * |            |                              |
+ * |            | m!mapInputRecords:37         |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapOutputRecords:181       |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapOutputRecords:37        |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |-------------------------------------------|
+ * </pre>
+ */
+public class FlowRunTable extends BaseTable<FlowRunTable> {
+  /** entity prefix */
+  private static final String PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+  /** config param name that specifies the flowrun table name */
+  public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+  /** default value for flowrun table name */
+  public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+  private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
+
+  /** default max number of versions */
+  public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+  public FlowRunTable() {
+    super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+   * (org.apache.hadoop.hbase.client.Admin,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public void createTable(Admin admin, Configuration hbaseConf)
+      throws IOException {
+
+    TableName table = getTableName(hbaseConf);
+    if (admin.tableExists(table)) {
+      // do not disable / delete existing table
+      // similar to the approach taken by map-reduce jobs when
+      // output directory exists
+      throw new IOException("Table " + table.getNameAsString()
+          + " already exists.");
+    }
+
+    HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+    HColumnDescriptor infoCF =
+        new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+    infoCF.setBloomFilterType(BloomType.ROWCOL);
+    flowRunTableDescp.addFamily(infoCF);
+    infoCF.setMinVersions(1);
+    infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+    // TODO: figure the split policy
+    flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
+        .getCanonicalName());
+    admin.createTable(flowRunTableDescp);
+    LOG.info("Status of table creation for " + table.getNameAsString() + "="
+        + admin.tableExists(table));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
new file mode 100644
index 0000000..a1948aa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
+ * table. Looks through the list of cells per row, checks their tags and does
+ * operation on those cells as per the cell tags. Transforms reads of the stored
+ * metrics into calculated sums for each column Also, finds the min and max for
+ * start and end times in a flow run.
+ */
+class FlowScanner implements RegionScanner, Closeable {
+
+  private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+
+  private final HRegion region;
+  private final InternalScanner flowRunScanner;
+  private RegionScanner regionScanner;
+  private final int limit;
+  private boolean hasMore;
+  private byte[] currentRow;
+  private List<Cell> availableCells = new ArrayList<>();
+  private int currentIndex;
+
+  FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
+    this.region = region;
+    this.limit = limit;
+    this.flowRunScanner = internalScanner;
+    if (internalScanner instanceof RegionScanner) {
+      this.regionScanner = (RegionScanner) internalScanner;
+    }
+    // TODO: note if it's compaction/flush
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+   */
+  @Override
+  public HRegionInfo getRegionInfo() {
+    return region.getRegionInfo();
+  }
+
+  @Override
+  public boolean nextRaw(List<Cell> cells) throws IOException {
+    return nextRaw(cells, limit);
+  }
+
+  @Override
+  public boolean nextRaw(List<Cell> cells, int limit) throws IOException {
+    return nextInternal(cells, limit);
+  }
+
+  @Override
+  public boolean next(List<Cell> cells) throws IOException {
+    return next(cells, limit);
+  }
+
+  @Override
+  public boolean next(List<Cell> cells, int limit) throws IOException {
+    return nextInternal(cells, limit);
+  }
+
+  private String getAggregationCompactionDimension(List<Tag> tags) {
+    String appId = null;
+    for (Tag t : tags) {
+      if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+          .getType()) {
+        appId = Bytes.toString(t.getValue());
+      }
+    }
+    return appId;
+  }
+
+  /**
+   * This method loops through the cells in a given row of the
+   * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
+   * to process the contents. It then calculates the sum or min or max for each
+   * column or returns the cell as is.
+   *
+   * @param cells
+   * @param limit
+   * @return true if next row is available for the scanner, false otherwise
+   * @throws IOException
+   */
+  private boolean nextInternal(List<Cell> cells, int limit) throws IOException {
+    Cell cell = null;
+    startNext();
+    // Loop through all the cells in this row
+    // For min/max/metrics we do need to scan the entire set of cells to get the
+    // right one
+    // But with flush/compaction, the number of cells being scanned will go down
+    // cells are grouped per column qualifier then sorted by cell timestamp
+    // (latest to oldest) per column qualifier
+    // So all cells in one qualifier come one after the other before we see the
+    // next column qualifier
+    ByteArrayComparator comp = new ByteArrayComparator();
+    byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+    AggregationOperation currentAggOp = null;
+    SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
+    Set<String> alreadySeenAggDim = new HashSet<>();
+    int addedCnt = 0;
+    while (((cell = peekAtNextCell(limit)) != null)
+        && (limit <= 0 || addedCnt < limit)) {
+      byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
+      if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
+        addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
+        resetState(currentColumnCells, alreadySeenAggDim);
+        currentColumnQualifier = newColumnQualifier;
+        currentAggOp = getCurrentAggOp(cell);
+      }
+      collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
+      nextCell(limit);
+    }
+    if (!currentColumnCells.isEmpty()) {
+      emitCells(cells, currentColumnCells, currentAggOp);
+    }
+    return hasMore();
+  }
+
+  private AggregationOperation getCurrentAggOp(Cell cell) {
+    List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLength());
+    // We assume that all the operations for a particular column are the same
+    return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+  }
+
+  /**
+   * resets the parameters to an intialized state for next loop iteration
+   *
+   * @param cell
+   * @param currentAggOp
+   * @param currentColumnCells
+   * @param alreadySeenAggDim
+   * @param collectedButNotEmitted
+   */
+  private void resetState(SortedSet<Cell> currentColumnCells,
+      Set<String> alreadySeenAggDim) {
+    currentColumnCells.clear();
+    alreadySeenAggDim.clear();
+  }
+
+  private void collectCells(SortedSet<Cell> currentColumnCells,
+      AggregationOperation currentAggOp, Cell cell,
+      Set<String> alreadySeenAggDim) throws IOException {
+    if (currentAggOp == null) {
+      // not a min/max/metric cell, so just return it as is
+      currentColumnCells.add(cell);
+      nextCell(limit);
+      return;
+    }
+
+    switch (currentAggOp) {
+    case MIN:
+      if (currentColumnCells.size() == 0) {
+        currentColumnCells.add(cell);
+      } else {
+        Cell currentMinCell = currentColumnCells.first();
+        Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
+        if (!currentMinCell.equals(newMinCell)) {
+          currentColumnCells.remove(currentMinCell);
+          currentColumnCells.add(newMinCell);
+        }
+      }
+      break;
+    case MAX:
+      if (currentColumnCells.size() == 0) {
+        currentColumnCells.add(cell);
+      } else {
+        Cell currentMaxCell = currentColumnCells.first();
+        Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
+        if (!currentMaxCell.equals(newMaxCell)) {
+          currentColumnCells.remove(currentMaxCell);
+          currentColumnCells.add(newMaxCell);
+        }
+      }
+      break;
+    case SUM:
+    case SUM_FINAL:
+      // only if this app has not been seen yet, add to current column cells
+      List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+          cell.getTagsLength());
+      String aggDim = getAggregationCompactionDimension(tags);
+      if (alreadySeenAggDim.contains(aggDim)) {
+        // if this agg dimension has already been seen,
+        // since they show up in sorted order
+        // we drop the rest which are older
+        // in other words, this cell is older than previously seen cells
+        // for that agg dim
+      } else {
+        // not seen this agg dim, hence consider this cell in our working set
+        currentColumnCells.add(cell);
+        alreadySeenAggDim.add(aggDim);
+      }
+      break;
+    default:
+      break;
+    } // end of switch case
+  }
+
+  /*
+   * Processes the cells in input param currentColumnCells and populates
+   * List<Cell> cells as the output based on the input AggregationOperation
+   * parameter.
+   */
+  private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells,
+      AggregationOperation currentAggOp) throws IOException {
+    if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
+      return 0;
+    }
+    if (currentAggOp == null) {
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    }
+
+    switch (currentAggOp) {
+    case MIN:
+    case MAX:
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    case SUM:
+    case SUM_FINAL:
+      Cell sumCell = processSummation(currentColumnCells);
+      cells.add(sumCell);
+      return 1;
+    default:
+      cells.addAll(currentColumnCells);
+      return currentColumnCells.size();
+    }
+  }
+
+  /*
+   * Returns a cell whose value is the sum of all cell values in the input set.
+   * The new cell created has the timestamp of the most recent metric cell. The
+   * sum of a metric for a flow run is the summation at the point of the last
+   * metric update in that flow till that time.
+   */
+  private Cell processSummation(SortedSet<Cell> currentColumnCells)
+      throws IOException {
+    Number sum = 0;
+    Number currentValue = 0;
+    long ts = 0L;
+    long mostCurrentTimestamp = 0l;
+    Cell mostRecentCell = null;
+    for (Cell cell : currentColumnCells) {
+      currentValue = (Number) GenericObjectMapper.read(CellUtil
+          .cloneValue(cell));
+      ts = cell.getTimestamp();
+      if (mostCurrentTimestamp < ts) {
+        mostCurrentTimestamp = ts;
+        mostRecentCell = cell;
+      }
+      sum = sum.longValue() + currentValue.longValue();
+    }
+    Cell sumCell = createNewCell(mostRecentCell, sum);
+    return sumCell;
+  }
+
+  /**
+   * Determines which cell is to be returned based on the values in each cell
+   * and the comparison operation MIN or MAX.
+   *
+   * @param previouslyChosenCell
+   * @param currentCell
+   * @param currentAggOp
+   * @return the cell which is the min (or max) cell
+   * @throws IOException
+   */
+  private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
+      AggregationOperation currentAggOp) throws IOException {
+    if (previouslyChosenCell == null) {
+      return currentCell;
+    }
+    try {
+      long previouslyChosenCellValue = ((Number) GenericObjectMapper
+          .read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
+      long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
+          .cloneValue(currentCell))).longValue();
+      switch (currentAggOp) {
+      case MIN:
+        if (currentCellValue < previouslyChosenCellValue) {
+          // new value is minimum, hence return this cell
+          return currentCell;
+        } else {
+          // previously chosen value is miniumum, hence return previous min cell
+          return previouslyChosenCell;
+        }
+      case MAX:
+        if (currentCellValue > previouslyChosenCellValue) {
+          // new value is max, hence return this cell
+          return currentCell;
+        } else {
+          // previously chosen value is max, hence return previous max cell
+          return previouslyChosenCell;
+        }
+      default:
+        return currentCell;
+      }
+    } catch (IllegalArgumentException iae) {
+      LOG.error("caught iae during conversion to long ", iae);
+      return currentCell;
+    }
+  }
+
+  private Cell createNewCell(Cell origCell, Number number) throws IOException {
+    byte[] newValue = GenericObjectMapper.write(number);
+    return CellUtil.createCell(CellUtil.cloneRow(origCell),
+        CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+        origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+  }
+
+  @Override
+  public void close() throws IOException {
+    flowRunScanner.close();
+  }
+
+  /**
+   * Called to signal the start of the next() call by the scanner.
+   */
+  public void startNext() {
+    currentRow = null;
+  }
+
+  /**
+   * Returns whether or not the underlying scanner has more rows.
+   */
+  public boolean hasMore() {
+    return currentIndex < availableCells.size() ? true : hasMore;
+  }
+
+  /**
+   * Returns the next available cell for the current row and advances the
+   * pointer to the next cell. This method can be called multiple times in a row
+   * to advance through all the available cells.
+   *
+   * @param limit
+   *          the limit of number of cells to return if the next batch must be
+   *          fetched by the wrapped scanner
+   * @return the next available cell or null if no more cells are available for
+   *         the current row
+   * @throws IOException
+   */
+  public Cell nextCell(int limit) throws IOException {
+    Cell cell = peekAtNextCell(limit);
+    if (cell != null) {
+      currentIndex++;
+    }
+    return cell;
+  }
+
+  /**
+   * Returns the next available cell for the current row, without advancing the
+   * pointer. Calling this method multiple times in a row will continue to
+   * return the same cell.
+   *
+   * @param limit
+   *          the limit of number of cells to return if the next batch must be
+   *          fetched by the wrapped scanner
+   * @return the next available cell or null if no more cells are available for
+   *         the current row
+   * @throws IOException
+   */
+  public Cell peekAtNextCell(int limit) throws IOException {
+    if (currentIndex >= availableCells.size()) {
+      // done with current batch
+      availableCells.clear();
+      currentIndex = 0;
+      hasMore = flowRunScanner.next(availableCells, limit);
+    }
+    Cell cell = null;
+    if (currentIndex < availableCells.size()) {
+      cell = availableCells.get(currentIndex);
+      if (currentRow == null) {
+        currentRow = CellUtil.cloneRow(cell);
+      } else if (!CellUtil.matchingRow(cell, currentRow)) {
+        // moved on to the next row
+        // don't use the current cell
+        // also signal no more cells for this row
+        return null;
+      }
+    }
+    return cell;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+   */
+  @Override
+  public long getMaxResultSize() {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's scanner is not a RegionScanner");
+    }
+    return regionScanner.getMaxResultSize();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+   */
+  @Override
+  public long getMvccReadPoint() {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.getMvccReadPoint();
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+   */
+  @Override
+  public boolean isFilterDone() throws IOException {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.isFilterDone() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.isFilterDone();
+
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+   */
+  @Override
+  public boolean reseek(byte[] bytes) throws IOException {
+    if (regionScanner == null) {
+      throw new IllegalStateException(
+          "RegionScanner.reseek() called when the flow "
+              + "scanner's internal scanner is not a RegionScanner");
+    }
+    return regionScanner.reseek(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 2875e01..3962341 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -88,20 +87,15 @@ public class TestHBaseTimelineStorage {
   }
 
   private static void createSchema() throws IOException {
-    new EntityTable()
-        .createTable(util.getHBaseAdmin(), util.getConfiguration());
-    new AppToFlowTable()
-        .createTable(util.getHBaseAdmin(), util.getConfiguration());
-    new ApplicationTable()
-        .createTable(util.getHBaseAdmin(), util.getConfiguration());
+    TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
   }
 
   @Test
   public void testWriteApplicationToHBase() throws Exception {
     TimelineEntities te = new TimelineEntities();
     ApplicationEntity entity = new ApplicationEntity();
-    String id = "hello";
-    entity.setId(id);
+    String appId = "application_1000178881110_2002";
+    entity.setId(appId);
     long cTime = 1425016501000L;
     long mTime = 1425026901000L;
     entity.setCreatedTime(cTime);
@@ -173,12 +167,12 @@ public class TestHBaseTimelineStorage {
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
-      hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+      hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
       hbi.stop();
 
       // retrieve the row
       byte[] rowKey =
-          ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+          ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
       Get get = new Get(rowKey);
       get.setMaxVersions(Integer.MAX_VALUE);
       Connection conn = ConnectionFactory.createConnection(c1);
@@ -190,11 +184,11 @@ public class TestHBaseTimelineStorage {
       // check the row key
       byte[] row1 = result.getRow();
       assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
-          id));
+          appId));
 
       // check info column family
       String id1 = ApplicationColumn.ID.readResult(result).toString();
-      assertEquals(id, id1);
+      assertEquals(appId, id1);
 
       Number val =
           (Number) ApplicationColumn.CREATED_TIME.readResult(result);
@@ -252,17 +246,17 @@ public class TestHBaseTimelineStorage {
       assertEquals(metricValues, metricMap);
 
       // read the timeline entity using the reader this time
-      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+      TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
           entity.getType(), entity.getId(),
           EnumSet.of(TimelineReader.Field.ALL));
       Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
-          id, entity.getType(), null, null, null, null, null, null, null,
+          appId, entity.getType(), null, null, null, null, null, null, null,
           null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
       assertNotNull(e1);
       assertEquals(1, es1.size());
 
       // verify attributes
-      assertEquals(id, e1.getId());
+      assertEquals(appId, e1.getId());
       assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
           e1.getType());
       assertEquals(cTime, e1.getCreatedTime());
@@ -576,7 +570,7 @@ public class TestHBaseTimelineStorage {
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
-      String appName = "some app name";
+      String appName = "application_123465899910_1001";
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
       hbi.stop();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
new file mode 100644
index 0000000..f8331fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+
+/**
+ * Generates the data/entities for the FlowRun and FlowActivity Tables
+ */
+class TestFlowDataGenerator {
+
+  private final static String metric1 = "MAP_SLOT_MILLIS";
+  private final static String metric2 = "HDFS_BYTES_READ";
+
+
+  static TimelineEntity getEntityMetricsApp1() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 100000, 2);
+    metricValues.put(ts - 80000, 40);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+
+    TimelineMetric m2 = new TimelineMetric();
+    m2.setId(metric2);
+    metricValues = new HashMap<Long, Number>();
+    ts = System.currentTimeMillis();
+    metricValues.put(ts - 100000, 31);
+    metricValues.put(ts - 80000, 57);
+    m2.setType(Type.TIME_SERIES);
+    m2.setValues(metricValues);
+    metrics.add(m2);
+
+    entity.addMetrics(metrics);
+    return entity;
+  }
+
+  static TimelineEntity getEntityMetricsApp2() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunMetrics_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 100000, 5L);
+    metricValues.put(ts - 80000, 101L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+    return entity;
+  }
+
+  static TimelineEntity getEntity1() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHello";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 20000000000000L;
+    Long mTime = 1425026901000L;
+    entity.setCreatedTime(cTime);
+    entity.setModifiedTime(mTime);
+    // add metrics
+    Set<TimelineMetric> metrics = new HashSet<>();
+    TimelineMetric m1 = new TimelineMetric();
+    m1.setId(metric1);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    long ts = System.currentTimeMillis();
+    metricValues.put(ts - 120000, 100000000);
+    metricValues.put(ts - 100000, 200000000);
+    metricValues.put(ts - 80000, 300000000);
+    metricValues.put(ts - 60000, 400000000);
+    metricValues.put(ts - 40000, 50000000000L);
+    metricValues.put(ts - 20000, 60000000000L);
+    m1.setType(Type.TIME_SERIES);
+    m1.setValues(metricValues);
+    metrics.add(m1);
+    entity.addMetrics(metrics);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(1436512801000L);
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    return entity;
+  }
+
+  static TimelineEntity getEntityGreaterStartTime() {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setCreatedTime(30000000000000L);
+    entity.setId("flowRunHello with greater start time");
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setType(type);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    long endTs = 1439379885000L;
+    event.setTimestamp(endTs);
+    String expKey = "foo_event_greater";
+    String expVal = "test_app_greater";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getEntityMaxEndTime(long endTs) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId("flowRunHello Max End time");
+    entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+    event.setTimestamp(endTs);
+    String expKey = "foo_even_max_ finished";
+    String expVal = "test_app_max_finished";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+    return entity;
+  }
+
+  static TimelineEntity getEntityMinStartTime() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowRunHelloMInStartTime";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 10000000000000L;
+    entity.setCreatedTime(cTime);
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    event.setTimestamp(System.currentTimeMillis());
+    entity.addEvent(event);
+    return entity;
+  }
+
+
+  static TimelineEntity getFlowApp1() {
+    TimelineEntity entity = new TimelineEntity();
+    String id = "flowActivity_test";
+    String type = TimelineEntityType.YARN_APPLICATION.toString();
+    entity.setId(id);
+    entity.setType(type);
+    Long cTime = 1425016501000L;
+    entity.setCreatedTime(cTime);
+
+    TimelineEvent event = new TimelineEvent();
+    event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+    Long expTs = 1436512802000L;
+    event.setTimestamp(expTs);
+    String expKey = "foo_event";
+    Object expVal = "test";
+    event.addInfo(expKey, expVal);
+    entity.addEvent(event);
+
+    return entity;
+  }
+
+}


[3/3] hadoop git commit: YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)

Posted by sj...@apache.org.
YARN-3901. Populate flow run data in the flow_run & flow activity tables (Vrushali C via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b37985e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b37985e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b37985e

Branch: refs/heads/YARN-2928
Commit: 4b37985e6a238c7f84eecc4e7eadd30dffc90b88
Parents: b1960e0
Author: Sangjin Lee <sj...@apache.org>
Authored: Thu Sep 17 10:34:52 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Sep 17 10:34:52 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop-yarn-server-timelineservice/pom.xml  |  13 +
 .../storage/HBaseTimelineWriterImpl.java        | 179 ++++++-
 .../storage/TimelineSchemaCreator.java          |  22 +-
 .../storage/application/ApplicationColumn.java  |   5 +-
 .../application/ApplicationColumnPrefix.java    |  15 +-
 .../storage/apptoflow/AppToFlowColumn.java      |   6 +-
 .../timelineservice/storage/common/Column.java  |   6 +-
 .../storage/common/ColumnHelper.java            |  93 +++-
 .../storage/common/ColumnPrefix.java            |  28 +-
 .../storage/common/TimelineWriterUtils.java     | 185 +++++++
 .../storage/common/TimestampGenerator.java      | 112 +++++
 .../storage/common/package-info.java            |  24 -
 .../storage/entity/EntityColumn.java            |   6 +-
 .../storage/entity/EntityColumnPrefix.java      |  20 +-
 .../flow/AggregationCompactionDimension.java    |  63 +++
 .../storage/flow/AggregationOperation.java      |  87 ++++
 .../timelineservice/storage/flow/Attribute.java |  39 ++
 .../storage/flow/FlowActivityColumnFamily.java  |  54 +++
 .../storage/flow/FlowActivityColumnPrefix.java  | 243 ++++++++++
 .../storage/flow/FlowActivityRowKey.java        | 113 +++++
 .../storage/flow/FlowActivityTable.java         | 107 ++++
 .../storage/flow/FlowRunColumn.java             | 161 ++++++
 .../storage/flow/FlowRunColumnFamily.java       |  54 +++
 .../storage/flow/FlowRunColumnPrefix.java       | 239 +++++++++
 .../storage/flow/FlowRunCoprocessor.java        | 210 ++++++++
 .../storage/flow/FlowRunRowKey.java             |  50 ++
 .../storage/flow/FlowRunTable.java              | 141 ++++++
 .../storage/flow/FlowScanner.java               | 486 +++++++++++++++++++
 .../storage/TestHBaseTimelineStorage.java       |  28 +-
 .../storage/flow/TestFlowDataGenerator.java     | 213 ++++++++
 .../flow/TestHBaseStorageFlowActivity.java      | 372 ++++++++++++++
 .../storage/flow/TestHBaseStorageFlowRun.java   | 290 +++++++++++
 33 files changed, 3562 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0d43135..cc0f014 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -103,6 +103,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4102. Add a "skip existing table" mode for timeline schema creator (Li
     Lu via sjlee)
 
+    YARN-3901. Populate flow run data in the flow_run & flow activity tables
+    (Vrushali C via sjlee)
+
   IMPROVEMENTS
 
     YARN-3276. Code cleanup for timeline service API records. (Junping Du via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index da7fadf..758feb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -174,6 +174,19 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <additionnalDependencies>
+            <additionnalDependency>
+              <groupId>junit</groupId>
+              <artifactId>junit</artifactId>
+              <version>4.11</version>
+            </additionnalDependency>
+          </additionnalDependencies>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 772002d..7c4a5da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -33,11 +33,10 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
@@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
 /**
- * This implements a hbase based backend for storing application timeline entity
+ * This implements a hbase based backend for storing the timeline entity
  * information.
+ * It writes to multiple tables at the backend
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class HBaseTimelineWriterImpl extends AbstractService implements
     TimelineWriter {
 
+  private static final Log LOG = LogFactory
+      .getLog(HBaseTimelineWriterImpl.class);
+
   private Connection conn;
   private TypedBufferedMutator<EntityTable> entityTable;
   private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
   private TypedBufferedMutator<ApplicationTable> applicationTable;
-
-  private static final Log LOG = LogFactory
-      .getLog(HBaseTimelineWriterImpl.class);
+  private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
+  private TypedBufferedMutator<FlowRunTable> flowRunTable;
 
   public HBaseTimelineWriterImpl() {
     super(HBaseTimelineWriterImpl.class.getName());
@@ -91,6 +103,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     entityTable = new EntityTable().getTableMutator(hbaseConf, conn);
     appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn);
     applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn);
+    flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn);
+    flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn);
   }
 
   /**
@@ -111,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       // if the entity is the application, the destination is the application
       // table
-      boolean isApplication = isApplicationEntity(te);
+      boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
       byte[] rowKey = isApplication ?
           ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
               appId) :
@@ -124,37 +138,139 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeMetrics(rowKey, te.getMetrics(), isApplication);
       storeRelations(rowKey, te, isApplication);
 
-      if (isApplicationCreated(te)) {
-        onApplicationCreated(
-            clusterId, userId, flowName, flowVersion, flowRunId, appId, te);
+      if (isApplication) {
+        if (TimelineWriterUtils.isApplicationCreated(te)) {
+          onApplicationCreated(clusterId, userId, flowName, flowVersion,
+              flowRunId, appId, te);
+        }
+        // if it's an application entity, store metrics
+        storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
+            appId, te);
+        // if application has finished, store it's finish time and write final
+        // values
+        // of all metrics
+        if (TimelineWriterUtils.isApplicationFinished(te)) {
+          onApplicationFinished(clusterId, userId, flowName, flowVersion,
+              flowRunId, appId, te);
+        }
       }
     }
     return putStatus;
   }
 
-  private static boolean isApplicationEntity(TimelineEntity te) {
-    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  private void onApplicationCreated(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    // store in App to flow table
+    storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId,
+        appId, te);
+    // store in flow run table
+    storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
+        flowRunId, appId, te);
+    // store in flow activity table
+    storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+        flowRunId, appId, te);
   }
 
-  private static boolean isApplicationCreated(TimelineEntity te) {
-    if (isApplicationEntity(te)) {
-      for (TimelineEvent event : te.getEvents()) {
-        if (event.getId().equals(
-            ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
-          return true;
-        }
-      }
-    }
-    return false;
+  /*
+   * updates the {@link FlowActivityTable} with the Application TimelineEntity
+   * information
+   */
+  private void storeInFlowActivityTable(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
+    byte[] qualifier = GenericObjectMapper.write(flowRunId);
+    FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+        null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
   }
 
-  private void onApplicationCreated(String clusterId, String userId,
+  /*
+   * updates the {@link FlowRunTable} with Application Created information
+   */
+  private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+        flowRunId);
+    FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
+        te.getCreatedTime(),
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+  }
+
+  private void storeInAppToFlowTable(String clusterId, String userId,
       String flowName, String flowVersion, long flowRunId, String appId,
       TimelineEntity te) throws IOException {
     byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
     AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
-    AppToFlowColumn.FLOW_RUN_ID.store(
-        rowKey, appToFlowTable, null, flowRunId);
+    AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+  }
+
+  /*
+   * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
+   * application has finished
+   */
+  private void onApplicationFinished(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntity te) throws IOException {
+    // store in flow run table
+    storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
+        appId, te);
+
+    // indicate in the flow activity table that the app has finished
+    storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
+        flowRunId, appId, te);
+  }
+
+  /*
+   * Update the {@link FlowRunTable} with Application Finished information
+   */
+  private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
+      String flowName, long flowRunId, String appId, TimelineEntity te)
+      throws IOException {
+    byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+        flowRunId);
+    Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
+        .getAttribute(appId);
+    FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
+        TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+
+    // store the final value of metrics since application has finished
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      storeFlowMetrics(rowKey, metrics, attributeAppId,
+          AggregationOperation.SUM_FINAL.getAttribute());
+    }
+  }
+
+  /*
+   * Updates the {@link FlowRunTable} with Application Metrics
+   */
+  private void storeFlowMetricsAppRunning(String clusterId, String userId,
+      String flowName, long flowRunId, String appId, TimelineEntity te)
+      throws IOException {
+    Set<TimelineMetric> metrics = te.getMetrics();
+    if (metrics != null) {
+      byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
+          flowRunId);
+      storeFlowMetrics(rowKey, metrics,
+          AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+    }
+  }
+
+  private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
+      Attribute... attributes) throws IOException {
+    for (TimelineMetric metric : metrics) {
+      String metricColumnQualifier = metric.getId();
+      Map<Long, Number> timeseries = metric.getValues();
+      for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
+        Long timestamp = timeseriesEntry.getKey();
+        FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable,
+            metricColumnQualifier, timestamp, timeseriesEntry.getValue(),
+            attributes);
+      }
+    }
   }
 
   private void storeRelations(byte[] rowKey, TimelineEntity te,
@@ -184,7 +300,6 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       // id3?id4?id5
       String compoundValue =
           Separator.VALUES.joinEncoded(connectedEntity.getValue());
-
       columnPrefix.store(rowKey, table, connectedEntity.getKey(), null,
           compoundValue);
     }
@@ -342,6 +457,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     entityTable.flush();
     appToFlowTable.flush();
     applicationTable.flush();
+    flowRunTable.flush();
+    flowActivityTable.flush();
   }
 
   /**
@@ -364,6 +481,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       LOG.info("closing the application table");
       applicationTable.close();
     }
+    if (flowRunTable != null) {
+      LOG.info("closing the flow run table");
+      // The close API performs flushing and releases any resources held
+      flowRunTable.close();
+    }
+    if (flowActivityTable != null) {
+      LOG.info("closing the flowActivityTable table");
+      // The close API performs flushing and releases any resources held
+      flowActivityTable.close();
+    }
     if (conn != null) {
       LOG.info("closing the hbase Connection");
       conn.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index e7e51a7..cbcff4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
 
 /**
  * This creates the schema for a hbase based backend for storing application
@@ -199,7 +201,7 @@ public class TimelineSchemaCreator {
     return commandLine;
   }
 
-  private static void createAllTables(Configuration hbaseConf,
+  public static void createAllTables(Configuration hbaseConf,
       boolean skipExisting) throws IOException {
 
     Connection conn = null;
@@ -236,6 +238,24 @@ public class TimelineSchemaCreator {
           throw e;
         }
       }
+      try {
+        new FlowRunTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
+      try {
+        new FlowActivityTable().createTable(admin, hbaseConf);
+      } catch (IOException e) {
+        if (skipExisting) {
+          LOG.warn("Skip and continue on: " + e.getMessage());
+        } else {
+          throw e;
+        }
+      }
     } finally {
       if (conn != null) {
         conn.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
index c028386..802626d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies fully qualified columns for the {@link ApplicationTable}.
@@ -76,9 +77,9 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
 
   public void store(byte[] rowKey,
       TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp,
-      Object inputValue) throws IOException {
+      Object inputValue, Attribute... attributes) throws IOException {
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
-        inputValue);
+        inputValue, attributes);
   }
 
   public Object readResult(Result result) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index ad1def6..d7b5773 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies partially qualified columns for the application table.
@@ -112,7 +113,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -123,8 +125,9 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
-  }
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+ }
 
   /*
    * (non-Javadoc)
@@ -137,7 +140,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute...attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -148,7 +152,8 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
index 423037a..859fdca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java
@@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Identifies fully qualified columns for the {@link AppToFlowTable}.
@@ -67,9 +69,9 @@ public enum AppToFlowColumn implements Column<AppToFlowTable> {
 
   public void store(byte[] rowKey,
       TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp,
-      Object inputValue) throws IOException {
+      Object inputValue, Attribute... attributes) throws IOException {
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
-        inputValue);
+        inputValue, attributes);
   }
 
   public Object readResult(Result result) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
index 3397d62..64c1cda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * A Column represents the way to store a fully qualified column in a specific
@@ -38,12 +39,15 @@ public interface Column<T> {
    *          column.
    * @param timestamp version timestamp. When null the server timestamp will be
    *          used.
+   * @param attributes Map of attributes for this mutation. used in the coprocessor
+   *          to set/read the cell tags. Can be null.
    * @param inputValue the value to write to the rowKey and column qualifier.
    *          Nothing gets written when null.
    * @throws IOException
    */
   public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
-      Long timestamp, Object inputValue) throws IOException;
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException;
 
   /**
    * Get the latest version of this specified column. Note: this call clones the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
index f1b7c58..3a2e088 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 /**
  * This class is meant to be used only by explicit Columns, and not directly to
  * write by clients.
@@ -58,31 +59,66 @@ public class ColumnHelper<T> {
    * Sends a Mutation to the table. The mutations will be buffered and sent over
    * the wire as part of a batch.
    *
-   * @param rowKey identifying the row to write. Nothing gets written when null.
-   * @param tableMutator used to modify the underlying HBase table
-   * @param columnQualifier column qualifier. Nothing gets written when null.
-   * @param timestamp version timestamp. When null the server timestamp will be
-   *          used.
-   * @param inputValue the value to write to the rowKey and column qualifier.
-   *          Nothing gets written when null.
+   * @param rowKey
+   *          identifying the row to write. Nothing gets written when null.
+   * @param tableMutator
+   *          used to modify the underlying HBase table
+   * @param columnQualifier
+   *          column qualifier. Nothing gets written when null.
+   * @param timestamp
+   *          version timestamp. When null the current timestamp multiplied with
+   *          TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of
+   *          app id will be used
+   * @param inputValue
+   *          the value to write to the rowKey and column qualifier. Nothing
+   *          gets written when null.
    * @throws IOException
    */
   public void store(byte[] rowKey, TypedBufferedMutator<?> tableMutator,
-      byte[] columnQualifier, Long timestamp, Object inputValue)
-      throws IOException {
+      byte[] columnQualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException {
     if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) {
       return;
     }
     Put p = new Put(rowKey);
+    timestamp = getPutTimestamp(timestamp, attributes);
+    p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
+        GenericObjectMapper.write(inputValue));
+    if ((attributes != null) && (attributes.length > 0)) {
+      for (Attribute attribute : attributes) {
+        p.setAttribute(attribute.getName(), attribute.getValue());
+      }
+    }
+    tableMutator.mutate(p);
+  }
 
+  /*
+   * Figures out the cell timestamp used in the Put For storing into flow run
+   * table. We would like to left shift the timestamp and supplement it with the
+   * AppId id so that there are no collisions in the flow run table's cells
+   */
+  private long getPutTimestamp(Long timestamp, Attribute[] attributes) {
     if (timestamp == null) {
-      p.addColumn(columnFamilyBytes, columnQualifier,
-          GenericObjectMapper.write(inputValue));
-    } else {
-      p.addColumn(columnFamilyBytes, columnQualifier, timestamp,
-          GenericObjectMapper.write(inputValue));
+      timestamp = System.currentTimeMillis();
     }
-    tableMutator.mutate(p);
+    String appId = getAppIdFromAttributes(attributes);
+    long supplementedTS = TimestampGenerator.getSupplementedTimestamp(
+        timestamp, appId);
+    return supplementedTS;
+  }
+
+  private String getAppIdFromAttributes(Attribute[] attributes) {
+    if (attributes == null) {
+      return null;
+    }
+    String appId = null;
+    for (Attribute attribute : attributes) {
+      if (AggregationCompactionDimension.APPLICATION_ID.toString().equals(
+          attribute.getName())) {
+        appId = Bytes.toString(attribute.getValue());
+      }
+    }
+    return appId;
   }
 
   /**
@@ -171,7 +207,9 @@ public class ColumnHelper<T> {
               for (Entry<Long, byte[]> cell : cells.entrySet()) {
                 V value =
                     (V) GenericObjectMapper.read(cell.getValue());
-                cellResults.put(cell.getKey(), value);
+                cellResults.put(
+                    TimestampGenerator.getTruncatedTimestamp(cell.getKey()),
+                    value);
               }
             }
             results.put(columnName, cellResults);
@@ -315,6 +353,27 @@ public class ColumnHelper<T> {
   /**
    * @param columnPrefixBytes The byte representation for the column prefix.
    *          Should not contain {@link Separator#QUALIFIERS}.
+   * @param qualifier for the remainder of the column.
+   * @return fully sanitized column qualifier that is a combination of prefix
+   *         and qualifier. If prefix is null, the result is simply the encoded
+   *         qualifier without any separator.
+   */
+  public static byte[] getColumnQualifier(byte[] columnPrefixBytes,
+      long qualifier) {
+
+    if (columnPrefixBytes == null) {
+      return Bytes.toBytes(qualifier);
+    }
+
+    // Convert qualifier to lower case, strip of separators and tag on column
+    // prefix.
+    byte[] columnQualifier =
+        Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier));
+    return columnQualifier;
+  }
+  /**
+   * @param columnPrefixBytes The byte representation for the column prefix.
+   *          Should not contain {@link Separator#QUALIFIERS}.
    * @param qualifier the byte representation for the remainder of the column.
    * @return fully sanitized column qualifier that is a combination of prefix
    *         and qualifier. If prefix is null, the result is simply the encoded

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 509ff49..db49098 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -23,6 +23,7 @@ import java.util.NavigableMap;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Used to represent a partially qualified column, where the actual column name
@@ -43,12 +44,36 @@ public interface ColumnPrefix<T> {
    * @param qualifier column qualifier. Nothing gets written when null.
    * @param timestamp version timestamp. When null the server timestamp will be
    *          used.
+   *@param attributes attributes for the mutation that are used by the coprocessor
+   *          to set/read the cell tags
    * @param inputValue the value to write to the rowKey and column qualifier.
    *          Nothing gets written when null.
    * @throws IOException
    */
   public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
-      String qualifier, Long timestamp, Object inputValue) throws IOException;
+      byte[] qualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException;
+
+  /**
+   * Sends a Mutation to the table. The mutations will be buffered and sent over
+   * the wire as part of a batch.
+   *
+   * @param rowKey identifying the row to write. Nothing gets written when null.
+   * @param tableMutator used to modify the underlying HBase table. Caller is
+   *          responsible to pass a mutator for the table that actually has this
+   *          column.
+   * @param qualifier column qualifier. Nothing gets written when null.
+   * @param timestamp version timestamp. When null the server timestamp will be
+   *          used.
+   *@param attributes attributes for the mutation that are used by the coprocessor
+   *          to set/read the cell tags
+   * @param inputValue the value to write to the rowKey and column qualifier.
+   *          Nothing gets written when null.
+   * @throws IOException
+   */
+  public void store(byte[] rowKey, TypedBufferedMutator<T> tableMutator,
+      String qualifier, Long timestamp, Object inputValue,
+      Attribute... attributes) throws IOException;
 
   /**
    * Get the latest version of this specified column. Note: this call clones the
@@ -81,4 +106,5 @@ public interface ColumnPrefix<T> {
    */
   public <V> NavigableMap<String, NavigableMap<Long, V>>
       readResultsWithTimestamps(Result result) throws IOException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
index 58bdedc7e..371371a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
@@ -19,9 +19,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * bunch of utility functions used across TimelineWriter classes
@@ -36,6 +46,9 @@ public class TimelineWriterUtils {
   /** indicator for no limits for splitting */
   public static final int NO_LIMIT_SPLIT = -1;
 
+  /** milliseconds in one day */
+  public static final long MILLIS_ONE_DAY = 86400000L;
+
   /**
    * Splits the source array into multiple array segments using the given
    * separator, up to a maximum of count items. This will naturally produce
@@ -140,4 +153,176 @@ public class TimelineWriterUtils {
     return Long.MAX_VALUE - key;
   }
 
+  /**
+   * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+   * for a given input timestamp
+   *
+   * @param ts
+   * @return timestamp of that day's beginning (midnight)
+   */
+  public static long getTopOfTheDayTimestamp(long ts) {
+    long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+    return dayTimestamp;
+  }
+
+  /**
+   * Combines the input array of attributes and the input aggregation operation
+   * into a new array of attributes.
+   *
+   * @param attributes
+   * @param aggOp
+   * @return array of combined attributes
+   */
+  public static Attribute[] combineAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+    Attribute[] combinedAttributes = new Attribute[newLength];
+
+    if (attributes != null) {
+      System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+    }
+
+    if (aggOp != null) {
+      Attribute a2 = aggOp.getAttribute();
+      combinedAttributes[newLength - 1] = a2;
+    }
+    return combinedAttributes;
+  }
+
+  /**
+   * Returns a number for the new array size. The new array is the combination
+   * of input array of attributes and the input aggregation operation.
+   *
+   * @param attributes
+   * @param aggOp
+   * @return the size for the new array
+   */
+  private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int oldLength = getAttributesLength(attributes);
+    int aggLength = getAppOpLength(aggOp);
+    return oldLength + aggLength;
+  }
+
+  private static int getAppOpLength(AggregationOperation aggOp) {
+    if (aggOp != null) {
+      return 1;
+    }
+    return 0;
+  }
+
+  private static int getAttributesLength(Attribute[] attributes) {
+    if (attributes != null) {
+      return attributes.length;
+    }
+    return 0;
+  }
+
+  /**
+   * checks if an application has finished
+   *
+   * @param te
+   * @return true if application has finished else false
+   */
+  public static boolean isApplicationFinished(TimelineEntity te) {
+    SortedSet<TimelineEvent> allEvents = te.getEvents();
+    if ((allEvents != null) && (allEvents.size() > 0)) {
+      TimelineEvent event = allEvents.last();
+      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * get the time at which an app finished
+   *
+   * @param te
+   * @return true if application has finished else false
+   */
+  public static long getApplicationFinishedTime(TimelineEntity te) {
+    SortedSet<TimelineEvent> allEvents = te.getEvents();
+    if ((allEvents != null) && (allEvents.size() > 0)) {
+      TimelineEvent event = allEvents.last();
+      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+        return event.getTimestamp();
+      }
+    }
+    return 0l;
+  }
+
+  /**
+   * Checks if the input TimelineEntity object is an ApplicationEntity.
+   *
+   * @param te
+   * @return true if input is an ApplicationEntity, false otherwise
+   */
+  public static boolean isApplicationEntity(TimelineEntity te) {
+    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  }
+
+  /**
+   * Checks for the APPLICATION_CREATED event.
+   *
+   * @param te
+   * @return true is application event exists, false otherwise
+   */
+  public static boolean isApplicationCreated(TimelineEntity te) {
+    if (isApplicationEntity(te)) {
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId()
+            .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns the first seen aggregation operation as seen in the list of input
+   * tags or null otherwise
+   *
+   * @param tags
+   * @return AggregationOperation
+   */
+  public static AggregationOperation getAggregationOperationFromTagsList(
+      List<Tag> tags) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      for (Tag tag : tags) {
+        if (tag.getType() == aggOp.getTagType()) {
+          return aggOp;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a {@link Tag} from the input attribute.
+   *
+   * @param attribute
+   * @return Tag
+   */
+  public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
+    // attribute could be either an Aggregation Operation or
+    // an Aggregation Dimension
+    // Get the Tag type from either
+    AggregationOperation aggOp = AggregationOperation
+        .getAggregationOperation(attribute.getKey());
+    if (aggOp != null) {
+      Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+      return t;
+    }
+
+    AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
+        .getAggregationCompactionDimension(attribute.getKey());
+    if (aggCompactDim != null) {
+      Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+      return t;
+    }
+    return null;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
new file mode 100644
index 0000000..555b64e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * Utility class that allows HBase coprocessors to interact with unique
+ * timestamps.
+ */
+public class TimestampGenerator {
+
+  /*
+   * if this is changed, then reading cell timestamps written with older
+   * multiplier value will not work
+   */
+  public static final long TS_MULTIPLIER = 1000L;
+
+  private final AtomicLong lastTimestamp = new AtomicLong();
+
+  /**
+   * Returns the current wall clock time in milliseconds, multiplied by the
+   * required precision.
+   */
+  public long currentTime() {
+    // We want to align cell timestamps with current time.
+    // cell timestamps are not be less than
+    // System.currentTimeMillis() * TS_MULTIPLIER.
+    return System.currentTimeMillis() * TS_MULTIPLIER;
+  }
+
+  /**
+   * Returns a timestamp value unique within the scope of this
+   * {@code TimestampGenerator} instance. For usage by HBase
+   * {@code RegionObserver} coprocessors, this normally means unique within a
+   * given region.
+   *
+   * Unlikely scenario of generating a non-unique timestamp: if there is a
+   * sustained rate of more than 1M hbase writes per second AND if region fails
+   * over within that time range of timestamps being generated then there may be
+   * collisions writing to a cell version of the same column.
+   */
+  public long getUniqueTimestamp() {
+    long lastTs;
+    long nextTs;
+    do {
+      lastTs = lastTimestamp.get();
+      nextTs = Math.max(lastTs + 1, currentTime());
+    } while (!lastTimestamp.compareAndSet(lastTs, nextTs));
+    return nextTs;
+  }
+
+  /**
+   * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   * application id
+   *
+   * Unlikely scenario of generating a timestamp that is a duplicate: If more
+   * than a 1000 concurrent apps are running in one flow run AND write to same
+   * column at the same time, then say appId of 1001 will overlap with appId of
+   * 001 and there may be collisions for that flow run's specific column.
+   *
+   * @param incomingTS
+   * @param appId
+   * @return a timestamp multiplied with TS_MULTIPLIER and last few digits of
+   *         application id
+   */
+  public static long getSupplementedTimestamp(long incomingTS, String appId) {
+    long suffix = getAppIdSuffix(appId);
+    long outgoingTS = incomingTS * TS_MULTIPLIER + suffix;
+    return outgoingTS;
+
+  }
+
+  private static long getAppIdSuffix(String appIdStr) {
+    if (appIdStr == null) {
+      return 0L;
+    }
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    long id = appId.getId() % TS_MULTIPLIER;
+    return id;
+  }
+
+  /**
+   * truncates the last few digits of the timestamp which were supplemented by
+   * the TimestampGenerator#getSupplementedTimestamp function
+   *
+   * @param incomingTS
+   * @return a truncated timestamp value
+   */
+  public static long getTruncatedTimestamp(long incomingTS) {
+    return incomingTS / TS_MULTIPLIER;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
deleted file mode 100644
index 32577fb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 26e7748..8ae19b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies fully qualified columns for the {@link EntityTable}.
@@ -81,9 +83,9 @@ public enum EntityColumn implements Column<EntityTable> {
 
   public void store(byte[] rowKey,
       TypedBufferedMutator<EntityTable> tableMutator, Long timestamp,
-      Object inputValue) throws IOException {
+      Object inputValue, Attribute... attributes) throws IOException {
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
-        inputValue);
+        inputValue, attributes);
   }
 
   public Object readResult(Result result) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 75ff742..0d4e5a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 
 /**
  * Identifies partially qualified columns for the entity table.
@@ -108,11 +109,13 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
    * #store(byte[],
    * org.apache.hadoop.yarn.server.timelineservice.storage.common.
-   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object)
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<EntityTable> tableMutator, String qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -123,8 +126,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
-  }
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+ }
 
   /*
    * (non-Javadoc)
@@ -137,7 +141,8 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
    */
   public void store(byte[] rowKey,
       TypedBufferedMutator<EntityTable> tableMutator, byte[] qualifier,
-      Long timestamp, Object inputValue) throws IOException {
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
 
     // Null check
     if (qualifier == null) {
@@ -148,8 +153,9 @@ public enum EntityColumnPrefix implements ColumnPrefix<EntityTable> {
     byte[] columnQualifier =
         ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier);
 
-    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue);
-  }
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        attributes);
+ }
 
   /*
    * (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
new file mode 100644
index 0000000..ff12c7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the compaction dimensions for the data in the {@link FlowRunTable}
+ * .
+ */
+public enum AggregationCompactionDimension {
+
+  /**
+   * the application id
+   */
+  APPLICATION_ID((byte) 101);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationCompactionDimension(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute(String attributeValue) {
+    return new Attribute(this.name(), Bytes.toBytes(attributeValue));
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  public static AggregationCompactionDimension getAggregationCompactionDimension(
+      String aggCompactDimStr) {
+    for (AggregationCompactionDimension aggDim : AggregationCompactionDimension
+        .values()) {
+      if (aggDim.name().equals(aggCompactDimStr)) {
+        return aggDim;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
new file mode 100644
index 0000000..c635ce6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Identifies the attributes to be set for puts into the {@link FlowRunTable}.
+ * The numbers used for tagType are prime numbers
+ */
+public enum AggregationOperation {
+
+  /**
+   * When the flow was started.
+   */
+  MIN((byte) 71),
+
+  /**
+   * When it ended.
+   */
+  MAX((byte) 73),
+
+  /**
+   * The metrics of the flow
+   */
+  SUM((byte) 79),
+
+  /**
+   * application running
+   */
+  SUM_FINAL((byte) 83),
+
+  /**
+   * compact
+   */
+  COMPACT((byte) 89);
+
+  private byte tagType;
+  private byte[] inBytes;
+
+  private AggregationOperation(byte tagType) {
+    this.tagType = tagType;
+    this.inBytes = Bytes.toBytes(this.name());
+  }
+
+  public Attribute getAttribute() {
+    return new Attribute(this.name(), this.inBytes);
+  }
+
+  public byte getTagType() {
+    return tagType;
+  }
+
+  public byte[] getInBytes() {
+    return this.inBytes.clone();
+  }
+
+  /**
+   * returns the AggregationOperation enum that represents that string
+   * @param aggOpStr
+   * @return the AggregationOperation enum that represents that string
+   */
+  public static AggregationOperation getAggregationOperation(String aggOpStr) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      if (aggOp.name().equals(aggOpStr)) {
+        return aggOp;
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
new file mode 100644
index 0000000..d3de518
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+/**
+ * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}.
+ */
+public class Attribute {
+  private final String name;
+  private final byte[] value;
+
+  public Attribute(String name, byte[] value) {
+    this.name = name;
+    this.value = value.clone();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public byte[] getValue() {
+    return value.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
new file mode 100644
index 0000000..d991b42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents the flow run table column families.
+ */
+public enum FlowActivityColumnFamily implements ColumnFamily<FlowActivityTable> {
+
+  /**
+   * Info column family houses known columns, specifically ones included in
+   * columnfamily filters.
+   */
+  INFO("i");
+
+  /**
+   * Byte representation of this column family.
+   */
+  private final byte[] bytes;
+
+  /**
+   * @param value
+   *          create a column family with this name. Must be lower case and
+   *          without spaces.
+   */
+  private FlowActivityColumnFamily(String value) {
+    // column families should be lower case and not contain any spaces.
+    this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+  }
+
+  public byte[] getBytes() {
+    return Bytes.copy(bytes);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
new file mode 100644
index 0000000..b899e5c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
+
+/**
+ * Identifies partially qualified columns for the {@link FlowActivityTable}
+ */
+public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable> {
+
+  /**
+   * To store run ids of the flows
+   */
+  RUN_ID(FlowActivityColumnFamily.INFO, "r", null);
+
+  private final ColumnHelper<FlowActivityTable> column;
+  private final ColumnFamily<FlowActivityTable> columnFamily;
+
+  /**
+   * Can be null for those cases where the provided column qualifier is the
+   * entire column name.
+   */
+  private final String columnPrefix;
+  private final byte[] columnPrefixBytes;
+
+  private final AggregationOperation aggOp;
+
+  /**
+   * Private constructor, meant to be used by the enum definition.
+   *
+   * @param columnFamily
+   *          that this column is stored in.
+   * @param columnPrefix
+   *          for this column.
+   */
+  private FlowActivityColumnPrefix(
+      ColumnFamily<FlowActivityTable> columnFamily, String columnPrefix,
+      AggregationOperation aggOp) {
+    column = new ColumnHelper<FlowActivityTable>(columnFamily);
+    this.columnFamily = columnFamily;
+    this.columnPrefix = columnPrefix;
+    if (columnPrefix == null) {
+      this.columnPrefixBytes = null;
+    } else {
+      // Future-proof by ensuring the right column prefix hygiene.
+      this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE
+          .encode(columnPrefix));
+    }
+    this.aggOp = aggOp;
+  }
+
+  /**
+   * @return the column name value
+   */
+  public String getColumnPrefix() {
+    return columnPrefix;
+  }
+
+  public byte[] getColumnPrefixBytes() {
+    return columnPrefixBytes.clone();
+  }
+
+  public AggregationOperation getAttribute() {
+    return aggOp;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, byte[] qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
+        combinedAttributes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String)
+   */
+  public Object readResult(Result result, String qualifier) throws IOException {
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    return column.readResult(result, columnQualifier);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResults(org.apache.hadoop.hbase.client.Result)
+   */
+  public Map<String, Object> readResults(Result result) throws IOException {
+    return column.readResults(result, columnPrefixBytes);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result)
+   */
+  public <T> NavigableMap<String, NavigableMap<Long, T>> readResultsWithTimestamps(
+      Result result) throws IOException {
+    return column.readResultsWithTimestamps(result, columnPrefixBytes);
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+   * is no match. The following holds true: {@code columnFor(x) == columnFor(y)}
+   * if and only if {@code x.equals(y)} or {@code (x == y == null)}
+   *
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null
+   */
+  public static final FlowActivityColumnPrefix columnFor(String columnPrefix) {
+
+    // Match column based on value, assume column family matches.
+    for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix
+        .values()) {
+      // Find a match based only on name.
+      if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) {
+        return flowActivityColPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /**
+   * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there
+   * is no match. The following holds true:
+   * {@code columnFor(a,x) == columnFor(b,y)} if and only if
+   * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)}
+   *
+   * @param columnFamily
+   *          The columnFamily for which to retrieve the column.
+   * @param columnPrefix
+   *          Name of the column to retrieve
+   * @return the corresponding {@link FlowActivityColumnPrefix} or null if both
+   *         arguments don't match.
+   */
+  public static final FlowActivityColumnPrefix columnFor(
+      FlowActivityColumnFamily columnFamily, String columnPrefix) {
+
+    // TODO: needs unit test to confirm and need to update javadoc to explain
+    // null prefix case.
+
+    for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix
+        .values()) {
+      // Find a match based column family and on name.
+      if (flowActivityColumnPrefix.columnFamily.equals(columnFamily)
+          && (((columnPrefix == null) && (flowActivityColumnPrefix
+              .getColumnPrefix() == null)) || (flowActivityColumnPrefix
+              .getColumnPrefix().equals(columnPrefix)))) {
+        return flowActivityColumnPrefix;
+      }
+    }
+    // Default to null
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
+   * #store(byte[],
+   * org.apache.hadoop.yarn.server.timelineservice.storage.common.
+   * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object,
+   * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[])
+   */
+  @Override
+  public void store(byte[] rowKey,
+      TypedBufferedMutator<FlowActivityTable> tableMutator, String qualifier,
+      Long timestamp, Object inputValue, Attribute... attributes)
+      throws IOException {
+    // Null check
+    if (qualifier == null) {
+      throw new IOException("Cannot store column with null qualifier in "
+          + tableMutator.getName().getNameAsString());
+    }
+
+    byte[] columnQualifier = ColumnHelper.getColumnQualifier(
+        this.columnPrefixBytes, qualifier);
+    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+        attributes, this.aggOp);
+    column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
+        combinedAttributes);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b37985e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
new file mode 100644
index 0000000..19e4e83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow activity table.
+ */
+public class FlowActivityRowKey {
+
+  private final String clusterId;
+  private final long dayTs;
+  private final String userId;
+  private final String flowId;
+
+  public FlowActivityRowKey(String clusterId, long dayTs, String userId,
+      String flowId) {
+    this.clusterId = clusterId;
+    this.dayTs = dayTs;
+    this.userId = userId;
+    this.flowId = flowId;
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public long getDayTimestamp() {
+    return dayTs;
+  }
+
+  public String getUserId() {
+    return userId;
+  }
+
+  public String getFlowId() {
+    return flowId;
+  }
+
+  /**
+   * Constructs a row key for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!user!flowId}
+   *
+   * Will insert into current day's record in the table
+   * @param clusterId
+   * @param userId
+   * @param flowId
+   * @return byte array with the row key prefix
+   */
+  public static byte[] getRowKey(String clusterId, String userId, String flowId) {
+    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+        .currentTimeMillis());
+    return getRowKey(clusterId, dayTs, userId, flowId);
+  }
+
+  /**
+   * Constructs a row key for the flow activity table as follows:
+   * {@code clusterId!dayTimestamp!user!flowId}
+   *
+   * @param clusterId
+   * @param dayTs
+   * @param userId
+   * @param flowId
+   * @return byte array for the row key
+   */
+  public static byte[] getRowKey(String clusterId, long dayTs, String userId,
+      String flowId) {
+    return Separator.QUALIFIERS.join(
+        Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
+        Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+        Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
+        Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
+  }
+
+  /**
+   * Given the raw row key as bytes, returns the row key as an object.
+   */
+  public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
+    byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey);
+
+    if (rowKeyComponents.length < 4) {
+      throw new IllegalArgumentException("the row key is not valid for "
+          + "a flow activity");
+    }
+
+    String clusterId = Separator.QUALIFIERS.decode(Bytes
+        .toString(rowKeyComponents[0]));
+    long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+    String userId = Separator.QUALIFIERS.decode(Bytes
+        .toString(rowKeyComponents[2]));
+    String flowId = Separator.QUALIFIERS.decode(Bytes
+        .toString(rowKeyComponents[3]));
+    return new FlowActivityRowKey(clusterId, dayTs, userId, flowId);
+  }
+}