You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/06/10 15:37:38 UTC

[4/4] hadoop git commit: YARN-5170. Eliminate singleton converters and static method access

YARN-5170. Eliminate singleton converters and static method access


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8271a57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8271a57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8271a57

Branch: refs/heads/YARN-2928
Commit: b8271a578977e3dc9c94f72635375c7f76ab2cc1
Parents: c39b9c4
Author: Varun Saxena <va...@apache.org>
Authored: Fri Jun 10 20:56:55 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Fri Jun 10 20:56:55 2016 +0530

----------------------------------------------------------------------
 .../storage/TestHBaseTimelineStorage.java       |  43 ++-
 .../flow/TestHBaseStorageFlowActivity.java      |  11 +-
 .../storage/flow/TestHBaseStorageFlowRun.java   |  11 +-
 .../flow/TestHBaseStorageFlowRunCompaction.java |  54 +--
 .../reader/filter/TimelineFilterUtils.java      |  38 ---
 .../storage/HBaseTimelineWriterImpl.java        | 226 ++++++++-----
 .../storage/application/ApplicationColumn.java  |   2 +-
 .../application/ApplicationColumnPrefix.java    |   3 +-
 .../storage/application/ApplicationRowKey.java  | 173 +++++++---
 .../application/ApplicationRowKeyConverter.java | 130 --------
 .../application/ApplicationRowKeyPrefix.java    |  69 ++++
 .../storage/apptoflow/AppToFlowRowKey.java      |  93 +++++-
 .../apptoflow/AppToFlowRowKeyConverter.java     |  96 ------
 .../storage/common/AppIdKeyConverter.java       |  11 +-
 .../storage/common/EventColumnName.java         |  15 +
 .../common/EventColumnNameConverter.java        |  12 +-
 .../storage/common/LongConverter.java           |  27 +-
 .../storage/common/LongKeyConverter.java        |  14 +-
 .../storage/common/RowKeyPrefix.java            |  42 +++
 .../storage/common/StringKeyConverter.java      |   7 +-
 .../storage/common/TimelineStorageUtils.java    | 171 ----------
 .../storage/entity/EntityColumn.java            |   3 +-
 .../storage/entity/EntityColumnPrefix.java      |   3 +-
 .../storage/entity/EntityRowKey.java            | 187 ++++++++---
 .../storage/entity/EntityRowKeyConverter.java   | 143 --------
 .../storage/entity/EntityRowKeyPrefix.java      |  74 +++++
 .../storage/flow/FlowActivityRowKey.java        | 162 ++++++---
 .../flow/FlowActivityRowKeyConverter.java       | 115 -------
 .../storage/flow/FlowActivityRowKeyPrefix.java  |  60 ++++
 .../storage/flow/FlowRunColumn.java             |   6 +-
 .../storage/flow/FlowRunColumnPrefix.java       |   2 +-
 .../storage/flow/FlowRunRowKey.java             | 129 +++++--
 .../storage/flow/FlowRunRowKeyConverter.java    | 120 -------
 .../storage/flow/FlowRunRowKeyPrefix.java       |  54 +++
 .../storage/reader/ApplicationEntityReader.java | 103 +++---
 .../reader/FlowActivityEntityReader.java        |  26 +-
 .../storage/reader/FlowRunEntityReader.java     |  80 ++---
 .../storage/reader/GenericEntityReader.java     | 333 ++++++++++---------
 .../storage/reader/TimelineEntityReader.java    | 136 +++++++-
 .../storage/common/TestKeyConverters.java       | 287 ++++------------
 .../storage/common/TestRowKeys.java             | 244 ++++++++++++++
 41 files changed, 1883 insertions(+), 1632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 7b647eb..fd5a7f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -70,12 +70,14 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -649,8 +651,9 @@ public class TestHBaseTimelineStorage {
 
       infoMap.putAll(infoMap1);
       // retrieve the row
-      byte[] rowKey =
-          ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
+      ApplicationRowKey applicationRowKey =
+          new ApplicationRowKey(cluster, user, flow, runid, appId);
+      byte[] rowKey = applicationRowKey.getRowKey();
       Get get = new Get(rowKey);
       get.setMaxVersions(Integer.MAX_VALUE);
       Connection conn = ConnectionFactory.createConnection(c1);
@@ -674,7 +677,7 @@ public class TestHBaseTimelineStorage {
 
       Map<String, Object> infoColumns =
           ApplicationColumnPrefix.INFO.readResults(result,
-              StringKeyConverter.getInstance());
+              new StringKeyConverter());
       assertEquals(infoMap, infoColumns);
 
       // Remember isRelatedTo is of type Map<String, Set<String>>
@@ -710,15 +713,16 @@ public class TestHBaseTimelineStorage {
         }
       }
 
+      KeyConverter<String> stringKeyConverter = new StringKeyConverter();
       // Configuration
       Map<String, Object> configColumns =
-          ApplicationColumnPrefix.CONFIG.readResults(result,
-              StringKeyConverter.getInstance());
+          ApplicationColumnPrefix.CONFIG
+              .readResults(result, stringKeyConverter);
       assertEquals(conf, configColumns);
 
       NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(
-              result, StringKeyConverter.getInstance());
+          ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result,
+              stringKeyConverter);
 
       NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
       matchMetrics(metricValues, metricMap);
@@ -908,7 +912,8 @@ public class TestHBaseTimelineStorage {
       // scan the table and see that entity exists
       Scan s = new Scan();
       byte[] startRow =
-          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+          new EntityRowKeyPrefix(cluster, user, flow, runid, appName)
+              .getRowKeyPrefix();
       s.setStartRow(startRow);
       s.setMaxVersions(Integer.MAX_VALUE);
       Connection conn = ConnectionFactory.createConnection(c1);
@@ -916,6 +921,7 @@ public class TestHBaseTimelineStorage {
 
       int rowCount = 0;
       int colCount = 0;
+      KeyConverter<String> stringKeyConverter = new StringKeyConverter();
       for (Result result : scanner) {
         if (result != null && !result.isEmpty()) {
           rowCount++;
@@ -936,7 +942,7 @@ public class TestHBaseTimelineStorage {
 
           Map<String, Object> infoColumns =
               EntityColumnPrefix.INFO.readResults(result,
-                  StringKeyConverter.getInstance());
+                  new StringKeyConverter());
           assertEquals(infoMap, infoColumns);
 
           // Remember isRelatedTo is of type Map<String, Set<String>>
@@ -975,13 +981,12 @@ public class TestHBaseTimelineStorage {
 
           // Configuration
           Map<String, Object> configColumns =
-              EntityColumnPrefix.CONFIG.readResults(result,
-                  StringKeyConverter.getInstance());
+              EntityColumnPrefix.CONFIG.readResults(result, stringKeyConverter);
           assertEquals(conf, configColumns);
 
           NavigableMap<String, NavigableMap<Long, Number>> metricsResult =
-              EntityColumnPrefix.METRIC.readResultsWithTimestamps(
-                  result, StringKeyConverter.getInstance());
+              EntityColumnPrefix.METRIC.readResultsWithTimestamps(result,
+                  stringKeyConverter);
 
           NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId());
           matchMetrics(metricValues, metricMap);
@@ -1116,8 +1121,9 @@ public class TestHBaseTimelineStorage {
       hbi.stop();
 
       // retrieve the row
-      byte[] rowKey =
-          ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName);
+      ApplicationRowKey applicationRowKey =
+          new ApplicationRowKey(cluster, user, flow, runid, appName);
+      byte[] rowKey = applicationRowKey.getRowKey();
       Get get = new Get(rowKey);
       get.setMaxVersions(Integer.MAX_VALUE);
       Connection conn = ConnectionFactory.createConnection(c1);
@@ -1132,7 +1138,7 @@ public class TestHBaseTimelineStorage {
 
       Map<EventColumnName, Object> eventsResult =
           ApplicationColumnPrefix.EVENT.readResults(result,
-              EventColumnNameConverter.getInstance());
+              new EventColumnNameConverter());
       // there should be only one event
       assertEquals(1, eventsResult.size());
       for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {
@@ -1212,7 +1218,8 @@ public class TestHBaseTimelineStorage {
       String appName = ApplicationId.newInstance(System.currentTimeMillis() +
           9000000L, 1).toString();
       byte[] startRow =
-          EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
+          new EntityRowKeyPrefix(cluster, user, flow, runid, appName)
+              .getRowKeyPrefix();
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
       hbi.stop();
       // scan the table and see that entity exists
@@ -1234,7 +1241,7 @@ public class TestHBaseTimelineStorage {
 
           Map<EventColumnName, Object> eventsResult =
               EntityColumnPrefix.EVENT.readResults(result,
-                  EventColumnNameConverter.getInstance());
+                  new EventColumnNameConverter());
           // there should be only one event
           assertEquals(1, eventsResult.size());
           for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 589b78d..37490ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -158,7 +158,7 @@ public class TestHBaseStorageFlowActivity {
     Table table1 = conn.getTable(TableName
         .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
     byte[] startRow =
-        FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
+        new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey();
     Get g = new Get(startRow);
     Result r1 = table1.get(g);
     assertNotNull(r1);
@@ -278,11 +278,12 @@ public class TestHBaseStorageFlowActivity {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
     byte[] startRow =
-        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
+        new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
     s.setStartRow(startRow);
     String clusterStop = cluster + "1";
     byte[] stopRow =
-        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
+        new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow)
+            .getRowKey();
     s.setStopRow(stopRow);
     Connection conn = ConnectionFactory.createConnection(c1);
     Table table1 = conn.getTable(TableName
@@ -420,11 +421,11 @@ public class TestHBaseStorageFlowActivity {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
     byte[] startRow =
-        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
+        new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey();
     s.setStartRow(startRow);
     String clusterStop = cluster + "1";
     byte[] stopRow =
-        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
+        new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey();
     s.setStopRow(stopRow);
     Connection conn = ConnectionFactory.createConnection(c1);
     Table table1 = conn.getTable(TableName

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index a443b50..328b25a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -59,8 +59,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
@@ -224,7 +224,7 @@ public class TestHBaseStorageFlowRun {
         .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
     // scan the table and see that we get back the right min and max
     // timestamps
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
     Get g = new Get(startRow);
     g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
         FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
@@ -354,10 +354,11 @@ public class TestHBaseStorageFlowRun {
       long runid, Configuration c1) throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowRunColumnFamily.INFO.getBytes());
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
     s.setStartRow(startRow);
     String clusterStop = cluster + "1";
-    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+    byte[] stopRow =
+        new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
     s.setStopRow(stopRow);
     Connection conn = ConnectionFactory.createConnection(c1);
     Table table1 = conn.getTable(TableName
@@ -629,7 +630,7 @@ public class TestHBaseStorageFlowRun {
         .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
     // scan the table and see that we get back the right min and max
     // timestamps
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
     Get g = new Get(startRow);
     g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
         FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 6b0ee5c..e1bef53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -19,24 +19,24 @@
 package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
@@ -46,21 +46,21 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 
 /**
  * Tests the FlowRun and FlowActivity Tables
@@ -194,10 +194,11 @@ public class TestHBaseStorageFlowRunCompaction {
       long runid, Configuration c1, int valueCount) throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowRunColumnFamily.INFO.getBytes());
-    byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+    byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey();
     s.setStartRow(startRow);
     String clusterStop = cluster + "1";
-    byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+    byte[] stopRow =
+        new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey();
     s.setStopRow(stopRow);
     Connection conn = ConnectionFactory.createConnection(c1);
     Table table1 = conn.getTable(TableName
@@ -302,8 +303,9 @@ public class TestHBaseStorageFlowRunCompaction {
         cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
     currentColumnCells.add(c4);
 
-    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells,
+            new LongConverter(), currentTimestamp);
     assertNotNull(cells);
 
     // we should be getting back 4 cells
@@ -387,8 +389,9 @@ public class TestHBaseStorageFlowRunCompaction {
       cellTsNotFinal++;
     }
 
-    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells,
+            new LongConverter(), currentTimestamp);
     assertNotNull(cells);
 
     // we should be getting back count + 1 cells
@@ -489,8 +492,9 @@ public class TestHBaseStorageFlowRunCompaction {
       cellTsNotFinal++;
     }
 
-    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells,
+            new LongConverter(), currentTimestamp);
     assertNotNull(cells);
 
     // we should be getting back
@@ -554,7 +558,7 @@ public class TestHBaseStorageFlowRunCompaction {
         130L, Bytes.toBytes(cellValue2), tagByteArray);
     currentColumnCells.add(c2);
     List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+        new LongConverter(), currentTimestamp);
     assertNotNull(cells);
 
     // we should be getting back two cells
@@ -602,7 +606,7 @@ public class TestHBaseStorageFlowRunCompaction {
     currentColumnCells.add(c1);
 
     List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+        new LongConverter(), currentTimestamp);
     assertNotNull(cells);
     // we should not get the same cell back
     // but we get back the flow cell
@@ -639,7 +643,7 @@ public class TestHBaseStorageFlowRunCompaction {
         currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
     currentColumnCells.add(c1);
     List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+        new LongConverter(), currentTimestamp);
     assertNotNull(cells);
     // we expect the same cell back
     assertEquals(1, cells.size());
@@ -653,15 +657,19 @@ public class TestHBaseStorageFlowRunCompaction {
     FlowScanner fs = getFlowScannerForTestingCompaction();
     long currentTimestamp = System.currentTimeMillis();
 
+    LongConverter longConverter = new LongConverter();
+
     SortedSet<Cell> currentColumnCells = null;
-    List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+    List<Cell> cells =
+        fs.processSummationMajorCompaction(currentColumnCells, longConverter,
+            currentTimestamp);
     assertNotNull(cells);
     assertEquals(0, cells.size());
 
     currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR);
-    cells = fs.processSummationMajorCompaction(currentColumnCells,
-        LongConverter.getInstance(), currentTimestamp);
+    cells =
+        fs.processSummationMajorCompaction(currentColumnCells, longConverter,
+            currentTimestamp);
     assertNotNull(cells);
     assertEquals(0, cells.size());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
index 036746b..cccae26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java
@@ -31,14 +31,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.hbase.filter.QualifierFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 
@@ -209,39 +204,6 @@ public final class TimelineFilterUtils {
     return singleColValFilter;
   }
 
-  private static <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix,
-      String column) {
-    if (colPrefix == ApplicationColumnPrefix.EVENT ||
-        colPrefix == EntityColumnPrefix.EVENT) {
-      return EventColumnNameConverter.getInstance().encode(
-          new EventColumnName(column, null, null));
-    } else {
-      return StringKeyConverter.getInstance().encode(column);
-    }
-  }
-
-  /**
-   * Create a filter list of qualifier filters based on passed set of columns.
-   *
-   * @param <T> Describes the type of column prefix.
-   * @param colPrefix Column Prefix.
-   * @param columns set of column qualifiers.
-   * @return filter list.
-   */
-  public static <T> FilterList createFiltersFromColumnQualifiers(
-      ColumnPrefix<T> colPrefix, Set<String> columns) {
-    FilterList list = new FilterList(Operator.MUST_PASS_ONE);
-    for (String column : columns) {
-      // For columns which have compound column qualifiers (eg. events), we need
-      // to include the required separator.
-      byte[] compoundColQual = createColQualifierPrefix(colPrefix, column);
-      list.addFilter(new QualifierFilter(CompareOp.EQUAL,
-          new BinaryPrefixComparator(
-              colPrefix.getColumnPrefixBytes(compoundColQual))));
-    }
-    return list;
-  }
-
   /**
    * Fetch columns from filter list containing exists and multivalue equality
    * filters. This is done to fetch only required columns from back-end and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index f8b5a65..3511a2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
@@ -45,11 +46,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -86,6 +86,17 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   private TypedBufferedMutator<FlowActivityTable> flowActivityTable;
   private TypedBufferedMutator<FlowRunTable> flowRunTable;
 
+  /**
+   * Used to convert strings key components to and from storage format.
+   */
+  private final KeyConverter<String> stringKeyConverter =
+      new StringKeyConverter();
+
+  /**
+   * Used to convert Long key components to and from storage format.
+   */
+  private final KeyConverter<Long> longKeyConverter = new LongKeyConverter();
+
   public HBaseTimelineWriterImpl() {
     super(HBaseTimelineWriterImpl.class.getName());
   }
@@ -138,12 +149,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       // if the entity is the application, the destination is the application
       // table
-      boolean isApplication = TimelineStorageUtils.isApplicationEntity(te);
-      byte[] rowKey = isApplication ?
-          ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
-              appId) :
-          EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId,
-              te.getType(), te.getId());
+      boolean isApplication = isApplicationEntity(te);
+      byte[] rowKey;
+      if (isApplication) {
+        ApplicationRowKey applicationRowKey =
+            new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+                appId);
+        rowKey = applicationRowKey.getRowKey();
+      } else {
+        EntityRowKey entityRowKey =
+            new EntityRowKey(clusterId, userId, flowName, flowRunId, appId,
+                te.getType(), te.getId());
+        rowKey = entityRowKey.getRowKey();
+      }
 
       storeInfo(rowKey, te, flowVersion, isApplication);
       storeEvents(rowKey, te.getEvents(), isApplication);
@@ -152,102 +170,101 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeRelations(rowKey, te, isApplication);
 
       if (isApplication) {
-        TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te,
-            ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        TimelineEvent event =
+            getApplicationEvent(te,
+                ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        FlowRunRowKey flowRunRowKey =
+            new FlowRunRowKey(clusterId, userId, flowName, flowRunId);
         if (event != null) {
-          onApplicationCreated(clusterId, userId, flowName, flowVersion,
-              flowRunId, appId, te, event.getTimestamp());
+          AppToFlowRowKey appToFlowRowKey =
+              new AppToFlowRowKey(clusterId, appId);
+          onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId,
+              flowVersion, te, event.getTimestamp());
         }
         // if it's an application entity, store metrics
-        storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
-            appId, te);
+        storeFlowMetricsAppRunning(flowRunRowKey, appId, te);
         // if application has finished, store it's finish time and write final
         // values of all metrics
-        event = TimelineStorageUtils.getApplicationEvent(te,
+        event = getApplicationEvent(te,
             ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
         if (event != null) {
-          onApplicationFinished(clusterId, userId, flowName, flowVersion,
-              flowRunId, appId, te, event.getTimestamp());
+          onApplicationFinished(flowRunRowKey, flowVersion, appId, te,
+              event.getTimestamp());
         }
       }
     }
     return putStatus;
   }
 
-  private void onApplicationCreated(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntity te, long appCreatedTimeStamp) throws IOException {
+  private void onApplicationCreated(FlowRunRowKey flowRunRowKey,
+      AppToFlowRowKey appToFlowRowKey, String appId, String userId,
+      String flowVersion, TimelineEntity te, long appCreatedTimeStamp)
+      throws IOException {
+
+    String flowName = flowRunRowKey.getFlowName();
+    Long flowRunId = flowRunRowKey.getFlowRunId();
+
     // store in App to flow table
-    storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
+    byte[] rowKey = appToFlowRowKey.getRowKey();
+    AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
+    AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
+    AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
+
     // store in flow run table
-    storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion,
-        flowRunId, appId, te);
-    // store in flow activity table
-    storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
-        flowRunId, appId, appCreatedTimeStamp);
-  }
+    storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te);
 
-  /*
-   * updates the {@link FlowActivityTable} with the Application TimelineEntity
-   * information
-   */
-  private void storeInFlowActivityTable(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      long activityTimeStamp) throws IOException {
-    byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
-        userId, flowName);
-    byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId);
-    FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
-        null, flowVersion,
+    // store in flow activity table
+    byte[] flowActivityRowKeyBytes =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName)
+            .getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes,
+        flowActivityTable, qualifier, null, flowVersion,
         AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
   }
 
   /*
    * updates the {@link FlowRunTable} with Application Created information
    */
-  private void storeAppCreatedInFlowRunTable(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntity te) throws IOException {
-    byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
-        flowRunId);
+  private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
     FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null,
         te.getCreatedTime(),
         AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
   }
 
-  private void storeInAppToFlowTable(String clusterId, String userId,
-      String flowName, long flowRunId, String appId, TimelineEntity te)
-      throws IOException {
-    byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId);
-    AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName);
-    AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId);
-    AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId);
-  }
 
   /*
    * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an
    * application has finished
    */
-  private void onApplicationFinished(String clusterId, String userId,
-      String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntity te, long appFinishedTimeStamp) throws IOException {
+  private void onApplicationFinished(FlowRunRowKey flowRunRowKey,
+      String flowVersion, String appId, TimelineEntity te,
+      long appFinishedTimeStamp) throws IOException {
     // store in flow run table
-    storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
-        appId, te, appFinishedTimeStamp);
+    storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te,
+        appFinishedTimeStamp);
 
     // indicate in the flow activity table that the app has finished
-    storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
-        flowRunId, appId, appFinishedTimeStamp);
+    byte[] rowKey =
+        new FlowActivityRowKey(flowRunRowKey.getClusterId(),
+            appFinishedTimeStamp, flowRunRowKey.getUserId(),
+            flowRunRowKey.getFlowName()).getRowKey();
+    byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId());
+    FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
+        null, flowVersion,
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
   }
 
   /*
    * Update the {@link FlowRunTable} with Application Finished information
    */
-  private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
-      String flowName, long flowRunId, String appId, TimelineEntity te,
-      long appFinishedTimeStamp) throws IOException {
-    byte[] rowKey =
-        FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
+  private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te, long appFinishedTimeStamp)
+      throws IOException {
+    byte[] rowKey = flowRunRowKey.getRowKey();
     Attribute attributeAppId =
         AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
     FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
@@ -264,13 +281,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   /*
    * Updates the {@link FlowRunTable} with Application Metrics
    */
-  private void storeFlowMetricsAppRunning(String clusterId, String userId,
-      String flowName, long flowRunId, String appId, TimelineEntity te)
-      throws IOException {
+  private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey,
+      String appId, TimelineEntity te) throws IOException {
     Set<TimelineMetric> metrics = te.getMetrics();
     if (metrics != null) {
-      byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
-          flowRunId);
+      byte[] rowKey = flowRunRowKey.getRowKey();
       storeFlowMetrics(rowKey, metrics,
           AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
           AggregationOperation.SUM.getAttribute());
@@ -280,8 +295,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
   private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics,
       Attribute... attributes) throws IOException {
     for (TimelineMetric metric : metrics) {
-      byte[] metricColumnQualifier =
-          StringKeyConverter.getInstance().encode(metric.getId());
+      byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId());
       Map<Long, Number> timeseries = metric.getValues();
       for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
         Long timestamp = timeseriesEntry.getKey();
@@ -320,8 +334,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       String compoundValue =
           Separator.VALUES.joinEncoded(connectedEntity.getValue());
       columnPrefix.store(rowKey, table,
-          StringKeyConverter.getInstance().encode(connectedEntity.getKey()),
-          null, compoundValue);
+          stringKeyConverter.encode(connectedEntity.getKey()), null,
+          compoundValue);
     }
   }
 
@@ -341,7 +355,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       if (info != null) {
         for (Map.Entry<String, Object> entry : info.entrySet()) {
           ApplicationColumnPrefix.INFO.store(rowKey, applicationTable,
-              StringKeyConverter.getInstance().encode(entry.getKey()), null,
+              stringKeyConverter.encode(entry.getKey()), null,
               entry.getValue());
         }
       }
@@ -355,7 +369,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       if (info != null) {
         for (Map.Entry<String, Object> entry : info.entrySet()) {
           EntityColumnPrefix.INFO.store(rowKey, entityTable,
-              StringKeyConverter.getInstance().encode(entry.getKey()), null,
+              stringKeyConverter.encode(entry.getKey()), null,
               entry.getValue());
         }
       }
@@ -371,8 +385,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       return;
     }
     for (Map.Entry<String, String> entry : config.entrySet()) {
-      byte[] configKey =
-          StringKeyConverter.getInstance().encode(entry.getKey());
+      byte[] configKey = stringKeyConverter.encode(entry.getKey());
       if (isApplication) {
         ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable,
             configKey, null, entry.getValue());
@@ -392,7 +405,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     if (metrics != null) {
       for (TimelineMetric metric : metrics) {
         byte[] metricColumnQualifier =
-            StringKeyConverter.getInstance().encode(metric.getId());
+            stringKeyConverter.encode(metric.getId());
         Map<Long, Number> timeseries = metric.getValues();
         for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) {
           Long timestamp = timeseriesEntry.getKey();
@@ -425,12 +438,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
                   "! Using the current timestamp");
               eventTimestamp = System.currentTimeMillis();
             }
-            EventColumnNameConverter converter =
-                EventColumnNameConverter.getInstance();
             Map<String, Object> eventInfo = event.getInfo();
             if ((eventInfo == null) || (eventInfo.size() == 0)) {
-              byte[] columnQualifierBytes = converter.encode(
-                  new EventColumnName(eventId, eventTimestamp, null));
+              byte[] columnQualifierBytes =
+                  new EventColumnName(eventId, eventTimestamp, null)
+                      .getColumnQualifier();
               if (isApplication) {
                 ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                     columnQualifierBytes, null, Separator.EMPTY_BYTES);
@@ -441,9 +453,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {
                 // eventId=infoKey
-                byte[] columnQualifierBytes = converter.encode(
-                    new EventColumnName(eventId, eventTimestamp,
-                        info.getKey()));
+                byte[] columnQualifierBytes =
+                    new EventColumnName(eventId, eventTimestamp, info.getKey())
+                        .getColumnQualifier();
                 if (isApplication) {
                   ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                       columnQualifierBytes, null, info.getValue());
@@ -459,12 +471,56 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     }
   }
 
+  /**
+   * Checks if the input TimelineEntity object is an ApplicationEntity.
+   *
+   * @param te TimelineEntity object.
+   * @return true if input is an ApplicationEntity, false otherwise
+   */
+  static boolean isApplicationEntity(TimelineEntity te) {
+    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  }
+
+  /**
+   * @param te TimelineEntity object.
+   * @param eventId event with this id needs to be fetched
+   * @return TimelineEvent if TimelineEntity contains the desired event.
+   */
+  private static TimelineEvent getApplicationEvent(TimelineEntity te,
+      String eventId) {
+    if (isApplicationEntity(te)) {
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId().equals(eventId)) {
+          return event;
+        }
+      }
+    }
+    return null;
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage
+   * .TimelineWriter#aggregate
+   * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity,
+   * org.apache
+   * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack)
+   */
   @Override
   public TimelineWriteResponse aggregate(TimelineEntity data,
       TimelineAggregationTrack track) throws IOException {
     return null;
   }
 
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush
+   * ()
+   */
   @Override
   public void flush() throws IOException {
     // flush all buffered mutators

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
index 80fcf8c..dde3911 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java
@@ -45,7 +45,7 @@ public enum ApplicationColumn implements Column<ApplicationTable> {
    * When the application was created.
    */
   CREATED_TIME(ApplicationColumnFamily.INFO, "created_time",
-      LongConverter.getInstance()),
+      new LongConverter()),
 
   /**
    * The version of the flow that this app belongs to.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
index 0febc67..42488f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java
@@ -67,8 +67,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> {
   /**
    * Metrics are stored with the metric name as the column name.
    */
-  METRIC(ApplicationColumnFamily.METRICS, null,
-      LongConverter.getInstance());
+  METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter());
 
   private final ColumnHelper<ApplicationTable> column;
   private final ColumnFamily<ApplicationTable> columnFamily;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index e476b21..da62fdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
 /**
  * Represents a rowkey for the application table.
  */
@@ -27,6 +33,8 @@ public class ApplicationRowKey {
   private final String flowName;
   private final Long flowRunId;
   private final String appId;
+  private final KeyConverter<ApplicationRowKey> appRowKeyConverter =
+      new ApplicationRowKeyConverter();
 
   public ApplicationRowKey(String clusterId, String userId, String flowName,
       Long flowRunId, String appId) {
@@ -58,60 +66,141 @@ public class ApplicationRowKey {
   }
 
   /**
-   * Constructs a row key prefix for the application table as follows:
-   * {@code clusterId!userName!flowName!}.
-   *
-   * @param clusterId Cluster Id.
-   * @param userId User Id.
-   * @param flowName Flow Name.
-   * @return byte array with the row key prefix
-   */
-  public static byte[] getRowKeyPrefix(String clusterId, String userId,
-      String flowName) {
-    return ApplicationRowKeyConverter.getInstance().encode(
-        new ApplicationRowKey(clusterId, userId, flowName, null, null));
-  }
-
-  /**
-   * Constructs a row key prefix for the application table as follows:
-   * {@code clusterId!userName!flowName!flowRunId!}.
-   *
-   * @param clusterId Cluster Id.
-   * @param userId User Id.
-   * @param flowName Flow Name.
-   * @param flowRunId Run Id for the flow.
-   * @return byte array with the row key prefix
-   */
-  public static byte[] getRowKeyPrefix(String clusterId, String userId,
-      String flowName, Long flowRunId) {
-    return ApplicationRowKeyConverter.getInstance().encode(
-        new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null));
-  }
-
-  /**
    * Constructs a row key for the application table as follows:
    * {@code clusterId!userName!flowName!flowRunId!AppId}.
    *
-   * @param clusterId Cluster Id.
-   * @param userId User Id.
-   * @param flowName Flow Name.
-   * @param flowRunId Run Id for the flow.
-   * @param appId App Id.
    * @return byte array with the row key
    */
-  public static byte[] getRowKey(String clusterId, String userId,
-      String flowName, Long flowRunId, String appId) {
-    return ApplicationRowKeyConverter.getInstance().encode(
-        new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId));
+  public byte[] getRowKey() {
+    return appRowKeyConverter.encode(this);
   }
 
   /**
    * Given the raw row key as bytes, returns the row key as an object.
    *
-   * @param rowKey Byte representation  of row key.
+   * @param rowKey Byte representation of row key.
    * @return An <cite>ApplicationRowKey</cite> object.
    */
   public static ApplicationRowKey parseRowKey(byte[] rowKey) {
-    return ApplicationRowKeyConverter.getInstance().decode(rowKey);
+    return new ApplicationRowKeyConverter().decode(rowKey);
   }
+
+  /**
+   * Encodes and decodes row key for application table. The row key is of the
+   * form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long,
+   * appId is encoded and decoded using {@link AppIdKeyConverter} and rest are
+   * strings.
+   * <p>
+   */
+  final private static class ApplicationRowKeyConverter implements
+      KeyConverter<ApplicationRowKey> {
+
+    private final KeyConverter<String> appIDKeyConverter =
+        new AppIdKeyConverter();
+
+    /**
+     * Intended for use in ApplicationRowKey only.
+     */
+    private ApplicationRowKeyConverter() {
+    }
+
+    /**
+     * Application row key is of the form
+     * clusterId!userName!flowName!flowRunId!appId with each segment separated
+     * by !. The sizes below indicate sizes of each one of these segements in
+     * sequence. clusterId, userName and flowName are strings. flowrunId is a
+     * long hence 8 bytes in size. app id is represented as 12 bytes with
+     * cluster timestamp part of appid takes 8 bytes(long) and seq id takes 4
+     * bytes(int). Strings are variable in size (i.e. end whenever separator is
+     * encountered). This is used while decoding and helps in determining where
+     * to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG,
+        AppIdKeyConverter.getKeySize() };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes ApplicationRowKey object into a byte array with each
+     * component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
+     * This leads to an application table row key of the form
+     * clusterId!userName!flowName!flowRunId!appId If flowRunId in passed
+     * ApplicationRowKey object is null (and the fields preceding it i.e.
+     * clusterId, userId and flowName are not null), this returns a row key
+     * prefix of the form clusterId!userName!flowName! and if appId in
+     * ApplicationRowKey is null (other 4 components all are not null), this
+     * returns a row key prefix of the form
+     * clusterId!userName!flowName!flowRunId! flowRunId is inverted while
+     * encoding as it helps maintain a descending order for row keys in the
+     * application table.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(ApplicationRowKey rowKey) {
+      byte[] cluster =
+          Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] user =
+          Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
+              Separator.QUALIFIERS);
+      byte[] flow =
+          Separator.encode(rowKey.getFlowName(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
+      // Note that flowRunId is a long, so we can't encode them all at the same
+      // time.
+      if (rowKey.getFlowRunId() == null) {
+        return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
+      }
+      byte[] second =
+          Bytes.toBytes(LongConverter.invertLong(
+              rowKey.getFlowRunId()));
+      if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
+        return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
+      }
+      byte[] third = appIDKeyConverter.encode(rowKey.getAppId());
+      return Separator.QUALIFIERS.join(first, second, third);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * Decodes an application row key of the form
+     * clusterId!userName!flowName!flowRunId!appId represented in byte format
+     * and converts it into an ApplicationRowKey object.flowRunId is inverted
+     * while decoding as it was inverted while encoding.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public ApplicationRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 5) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "an application");
+      }
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String userId =
+          Separator.decode(Bytes.toString(rowKeyComponents[1]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String flowName =
+          Separator.decode(Bytes.toString(rowKeyComponents[2]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      Long flowRunId =
+          LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3]));
+      String appId = appIDKeyConverter.decode(rowKeyComponents[4]);
+      return new ApplicationRowKey(clusterId, userId, flowName, flowRunId,
+          appId);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
deleted file mode 100644
index 3b054a5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.application;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-
-/**
- * Encodes and decodes row key for application table.
- * The row key is of the form : clusterId!userName!flowName!flowRunId!appId.
- * flowRunId is a long, appId is encoded and decoded using
- * {@link AppIdKeyConverter} and rest are strings.
- */
-public final class ApplicationRowKeyConverter implements
-    KeyConverter<ApplicationRowKey> {
-  private static final ApplicationRowKeyConverter INSTANCE =
-      new ApplicationRowKeyConverter();
-
-  public static ApplicationRowKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private ApplicationRowKeyConverter() {
-  }
-
-  // Application row key is of the form
-  // clusterId!userName!flowName!flowRunId!appId with each segment separated
-  // by !. The sizes below indicate sizes of each one of these segements in
-  // sequence. clusterId, userName and flowName are strings. flowrunId is a long
-  // hence 8 bytes in size. app id is represented as 12 bytes with cluster
-  // timestamp part of appid being 8 bytes(long) and seq id being 4 bytes(int).
-  // Strings are variable in size (i.e. end whenever separator is encountered).
-  // This is used while decoding and helps in determining where to split.
-  private static final int[] SEGMENT_SIZES = {
-      Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE,
-      Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize() };
-
-  /*
-   * (non-Javadoc)
-   *
-   * Encodes ApplicationRowKey object into a byte array with each
-   * component/field in ApplicationRowKey separated by Separator#QUALIFIERS.
-   * This leads to an application table row key of the form
-   * clusterId!userName!flowName!flowRunId!appId
-   * If flowRunId in passed ApplicationRowKey object is null (and the fields
-   * preceding it i.e. clusterId, userId and flowName are not null), this
-   * returns a row key prefix of the form clusterId!userName!flowName! and if
-   * appId in ApplicationRowKey is null (other 4 components are not null), this
-   * returns a row key prefix of the form clusterId!userName!flowName!flowRunId!
-   * flowRunId is inverted while encoding as it helps maintain a descending
-   * order for row keys in application table.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #encode(java.lang.Object)
-   */
-  @Override
-  public byte[] encode(ApplicationRowKey rowKey) {
-    byte[] cluster = Separator.encode(rowKey.getClusterId(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] user = Separator.encode(rowKey.getUserId(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] flow = Separator.encode(rowKey.getFlowName(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] first = Separator.QUALIFIERS.join(cluster, user, flow);
-    // Note that flowRunId is a long, so we can't encode them all at the same
-    // time.
-    if (rowKey.getFlowRunId() == null) {
-      return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES);
-    }
-    byte[] second = Bytes.toBytes(
-        TimelineStorageUtils.invertLong(rowKey.getFlowRunId()));
-    if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) {
-      return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES);
-    }
-    byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
-    return Separator.QUALIFIERS.join(first, second, third);
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Decodes an application row key of the form
-   * clusterId!userName!flowName!flowRunId!appId represented in byte format and
-   * converts it into an ApplicationRowKey object.flowRunId is inverted while
-   * decoding as it was inverted while encoding.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #decode(byte[])
-   */
-  @Override
-  public ApplicationRowKey decode(byte[] rowKey) {
-    byte[][] rowKeyComponents =
-        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-    if (rowKeyComponents.length != 5) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "an application");
-    }
-    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    Long flowRunId =
-        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
-    String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]);
-    return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
new file mode 100644
index 0000000..f61b0e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.application;
+
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix;
+
+/**
+ * Represents a partial rowkey (without flowName or without flowName and
+ * flowRunId) for the application table.
+ */
+public class ApplicationRowKeyPrefix extends ApplicationRowKey implements
+    RowKeyPrefix<ApplicationRowKey> {
+
+  /**
+   * Creates a prefix which generates the following rowKeyPrefixes for the
+   * application table: {@code clusterId!userName!flowName!}.
+   *
+   * @param clusterId the cluster on which applications ran
+   * @param userId the user that ran applications
+   * @param flowName the name of the flow that was run by the user on the
+   *          cluster
+   */
+  public ApplicationRowKeyPrefix(String clusterId, String userId,
+      String flowName) {
+    super(clusterId, userId, flowName, null, null);
+  }
+
+  /**
+   * Creates a prefix which generates the following rowKeyPrefixes for the
+   * application table: {@code clusterId!userName!flowName!flowRunId!}.
+   *
+   * @param clusterId identifying the cluster
+   * @param userId identifying the user
+   * @param flowName identifying the flow
+   * @param flowRunId identifying the instance of this flow
+   */
+  public ApplicationRowKeyPrefix(String clusterId, String userId,
+      String flowName, Long flowRunId) {
+    super(clusterId, userId, flowName, flowRunId, null);
+  }
+
+  /*
+   * (non-Javadoc)
+   *
+   * @see
+   * org.apache.hadoop.yarn.server.timelineservice.storage.application.
+   * RowKeyPrefix#getRowKeyPrefix()
+   */
+  @Override
+  public byte[] getRowKeyPrefix() {
+    return super.getRowKey();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index 6a38e32..8df4407 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -17,12 +17,19 @@
  */
 package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
 /**
  * Represents a rowkey for the app_flow table.
  */
 public class AppToFlowRowKey {
   private final String clusterId;
   private final String appId;
+  private final KeyConverter<AppToFlowRowKey> appToFlowRowKeyConverter =
+      new AppToFlowRowKeyConverter();
 
   public AppToFlowRowKey(String clusterId, String appId) {
     this.clusterId = clusterId;
@@ -41,13 +48,10 @@ public class AppToFlowRowKey {
    * Constructs a row key prefix for the app_flow table as follows:
    * {@code clusterId!AppId}.
    *
-   * @param clusterId Cluster Id.
-   * @param appId Application Id.
    * @return byte array with the row key
    */
-  public static byte[] getRowKey(String clusterId, String appId) {
-    return AppToFlowRowKeyConverter.getInstance().encode(
-        new AppToFlowRowKey(clusterId, appId));
+  public  byte[] getRowKey() {
+    return appToFlowRowKeyConverter.encode(this);
   }
 
   /**
@@ -57,6 +61,83 @@ public class AppToFlowRowKey {
    * @return an <cite>AppToFlowRowKey</cite> object.
    */
   public static AppToFlowRowKey parseRowKey(byte[] rowKey) {
-    return AppToFlowRowKeyConverter.getInstance().decode(rowKey);
+    return new AppToFlowRowKeyConverter().decode(rowKey);
+  }
+
+  /**
+   * Encodes and decodes row key for app_flow table. The row key is of the form
+   * clusterId!appId. clusterId is a string and appId is encoded/decoded using
+   * {@link AppIdKeyConverter}.
+   * <p>
+   */
+  final private static class AppToFlowRowKeyConverter implements
+      KeyConverter<AppToFlowRowKey> {
+
+    private final KeyConverter<String> appIDKeyConverter =
+        new AppIdKeyConverter();
+
+    /**
+     * Intended for use in AppToFlowRowKey only.
+     */
+    private AppToFlowRowKeyConverter() {
+    }
+
+
+    /**
+     * App to flow row key is of the form clusterId!appId with the 2 segments
+     * separated by !. The sizes below indicate sizes of both of these segments
+     * in sequence. clusterId is a string. appId is represented as 12 bytes w.
+     * cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4
+     * bytes(int). Strings are variable in size (i.e. end whenever separator is
+     * encountered). This is used while decoding and helps in determining where
+     * to split.
+     */
+    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
+        Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
+
+    /*
+     * (non-Javadoc)
+     *
+     * Encodes AppToFlowRowKey object into a byte array with each
+     * component/field in AppToFlowRowKey separated by Separator#QUALIFIERS.
+     * This leads to an app to flow table row key of the form clusterId!appId
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#encode(java.lang.Object)
+     */
+    @Override
+    public byte[] encode(AppToFlowRowKey rowKey) {
+      byte[] first =
+          Separator.encode(rowKey.getClusterId(), Separator.SPACE,
+              Separator.TAB, Separator.QUALIFIERS);
+      byte[] second = appIDKeyConverter.encode(rowKey.getAppId());
+      return Separator.QUALIFIERS.join(first, second);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * Decodes an app to flow row key of the form clusterId!appId represented
+     * in byte format and converts it into an AppToFlowRowKey object.
+     *
+     * @see
+     * org.apache.hadoop.yarn.server.timelineservice.storage.common
+     * .KeyConverter#decode(byte[])
+     */
+    @Override
+    public AppToFlowRowKey decode(byte[] rowKey) {
+      byte[][] rowKeyComponents =
+          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
+      if (rowKeyComponents.length != 2) {
+        throw new IllegalArgumentException("the row key is not valid for "
+            + "the app-to-flow table");
+      }
+      String clusterId =
+          Separator.decode(Bytes.toString(rowKeyComponents[0]),
+              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
+      String appId = appIDKeyConverter.decode(rowKeyComponents[1]);
+      return new AppToFlowRowKey(clusterId, appId);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
deleted file mode 100644
index 0f0b879d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-
-/**
- * Encodes and decodes row key for app_flow table.
- * The row key is of the form : clusterId!appId.
- * clusterId is a string and appId is encoded/decoded using
- * {@link AppIdKeyConverter}.
- */
-public final class AppToFlowRowKeyConverter
-    implements KeyConverter<AppToFlowRowKey> {
-  private static final AppToFlowRowKeyConverter INSTANCE =
-      new AppToFlowRowKeyConverter();
-
-  public static AppToFlowRowKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private AppToFlowRowKeyConverter() {
-  }
-
-  // App to flow row key is of the form clusterId!appId with the 2 segments
-  // separated by !. The sizes below indicate sizes of both of these segments
-  // in sequence. clusterId is a string. appId is represented as 12 bytes with
-  // cluster Timestamp part of appid being 8 bytes(long) and seq id being 4
-  // bytes(int).
-  // Strings are variable in size (i.e. end whenever separator is encountered).
-  // This is used while decoding and helps in determining where to split.
-  private static final int[] SEGMENT_SIZES = {
-      Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT };
-
-  /*
-   * (non-Javadoc)
-   *
-   * Encodes AppToFlowRowKey object into a byte array with each component/field
-   * in AppToFlowRowKey separated by Separator#QUALIFIERS. This leads to an
-   * app to flow table row key of the form clusterId!appId
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #encode(java.lang.Object)
-   */
-  @Override
-  public byte[] encode(AppToFlowRowKey rowKey) {
-    byte[] first = Separator.encode(rowKey.getClusterId(),
-        Separator.SPACE, Separator.TAB, Separator.QUALIFIERS);
-    byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId());
-    return Separator.QUALIFIERS.join(first, second);
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * Decodes an app to flow row key of the form clusterId!appId represented in
-   * byte format and converts it into an AppToFlowRowKey object.
-   *
-   * @see
-   * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter
-   * #decode(byte[])
-   */
-  @Override
-  public AppToFlowRowKey decode(byte[] rowKey) {
-    byte[][] rowKeyComponents =
-        Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
-    if (rowKeyComponents.length != 2) {
-      throw new IllegalArgumentException("the row key is not valid for " +
-          "the app-to-flow table");
-    }
-    String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]),
-        Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
-    String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]);
-    return new AppToFlowRowKey(clusterId, appId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
index a173b0f..f5f7aa6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java
@@ -28,13 +28,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
  * (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes).
  */
 public final class AppIdKeyConverter implements KeyConverter<String> {
-  private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter();
 
-  public static AppIdKeyConverter getInstance() {
-    return INSTANCE;
-  }
-
-  private AppIdKeyConverter() {
+  public AppIdKeyConverter() {
   }
 
   /*
@@ -58,7 +53,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
     ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
     byte[] appIdBytes = new byte[getKeySize()];
     byte[] clusterTs = Bytes.toBytes(
-        TimelineStorageUtils.invertLong(appId.getClusterTimestamp()));
+        LongConverter.invertLong(appId.getClusterTimestamp()));
     System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
     byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId()));
     System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
@@ -83,7 +78,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> {
     if (appIdBytes.length != getKeySize()) {
       throw new IllegalArgumentException("Invalid app id in byte format");
     }
-    long clusterTs = TimelineStorageUtils.invertLong(
+    long clusterTs = LongConverter.invertLong(
         Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
     int seqId = TimelineStorageUtils.invertInt(
         Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8271a57/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
index 6018f86..8445575 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java
@@ -26,6 +26,8 @@ public class EventColumnName {
   private final String id;
   private final Long timestamp;
   private final String infoKey;
+  private final KeyConverter<EventColumnName> eventColumnNameConverter =
+      new EventColumnNameConverter();
 
   public EventColumnName(String id, Long timestamp, String infoKey) {
     this.id = id;
@@ -45,4 +47,17 @@ public class EventColumnName {
     return infoKey;
   }
 
+  /**
+   * @return a byte array with each components/fields separated by
+   *         Separator#VALUES. This leads to an event column name of the form
+   *         eventId=timestamp=infokey. If both timestamp and infokey are null,
+   *         then a qualifier of the form eventId=timestamp= is returned. If
+   *         only infokey is null, then a qualifier of the form eventId= is
+   *         returned. These prefix forms are useful for queries that intend to
+   *         retrieve more than one specific column name.
+   */
+  public byte[] getColumnQualifier() {
+    return eventColumnNameConverter.encode(this);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org