You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gt...@apache.org on 2016/01/20 10:13:44 UTC

[27/50] [abbrv] hadoop git commit: YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee)

YARN-4178. [storage implementation] app id as string in row keys can cause incorrect ordering (Varun Saxena via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fdaa1e4e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fdaa1e4e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fdaa1e4e

Branch: refs/heads/feature-YARN-2928
Commit: fdaa1e4e16e71d9793b4302a50a4335356e3c70f
Parents: 2b6784a
Author: Sangjin Lee <sj...@apache.org>
Authored: Tue Oct 6 16:06:28 2015 -0700
Committer: Li Lu <gt...@apache.org>
Committed: Tue Jan 19 17:41:58 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../storage/ApplicationEntityReader.java        |  14 +-
 .../storage/FileSystemTimelineReaderImpl.java   |  14 +-
 .../storage/GenericEntityReader.java            |  17 +-
 .../storage/HBaseTimelineWriterImpl.java        |  20 +-
 .../storage/application/ApplicationRowKey.java  |  13 +-
 .../storage/apptoflow/AppToFlowRowKey.java      |   7 +-
 .../storage/common/Separator.java               |   4 +-
 .../storage/common/TimelineReaderUtils.java     | 112 -----
 .../storage/common/TimelineStorageUtils.java    | 475 +++++++++++++++++++
 .../storage/common/TimelineWriterUtils.java     | 328 -------------
 .../storage/entity/EntityRowKey.java            |  32 +-
 .../storage/flow/FlowActivityColumnPrefix.java  |   6 +-
 .../storage/flow/FlowActivityRowKey.java        |   9 +-
 .../storage/flow/FlowRunColumn.java             |   4 +-
 .../storage/flow/FlowRunColumnPrefix.java       |   6 +-
 .../storage/flow/FlowRunCoprocessor.java        |   4 +-
 .../storage/flow/FlowRunRowKey.java             |   6 +-
 .../storage/flow/FlowScanner.java               |   6 +-
 .../storage/TestHBaseTimelineStorage.java       |  13 +-
 .../common/TestTimelineStorageUtils.java        |  56 +++
 .../storage/common/TestTimelineWriterUtils.java |  29 --
 .../flow/TestHBaseStorageFlowActivity.java      |  10 +-
 23 files changed, 629 insertions(+), 559 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 902d05e..4ed4949 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -168,6 +168,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4058. Miscellaneous issues in NodeManager project (Naganarasimha G R
     via sjlee)
 
+    YARN-4178. [storage implementation] app id as string in row keys can cause
+    incorrect ordering (Varun Saxena via sjlee)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
index 61954e1..6d1a2ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -182,7 +182,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
       readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO,
           true);
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
       }
@@ -198,7 +198,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
       readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO,
           false);
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
       }
@@ -214,7 +214,7 @@ class ApplicationEntityReader extends GenericEntityReader {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false);
       if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
       }
       if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -228,7 +228,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
       readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
         return null;
       }
@@ -243,7 +243,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
       readEvents(entity, result, true);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
         return null;
       }
@@ -258,7 +258,7 @@ class ApplicationEntityReader extends GenericEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
       readMetrics(entity, result, ApplicationColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
         return null;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
index 626c770..30d1d00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.map.JsonMappingException;
@@ -321,31 +321,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService
           continue;
         }
         if (relatesTo != null && !relatesTo.isEmpty() &&
-            !TimelineReaderUtils
+            !TimelineStorageUtils
                 .matchRelations(entity.getRelatesToEntities(), relatesTo)) {
           continue;
         }
         if (isRelatedTo != null && !isRelatedTo.isEmpty() &&
-            !TimelineReaderUtils
+            !TimelineStorageUtils
                 .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) {
           continue;
         }
         if (infoFilters != null && !infoFilters.isEmpty() &&
-            !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+            !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
           continue;
         }
         if (configFilters != null && !configFilters.isEmpty() &&
-            !TimelineReaderUtils.matchFilters(
+            !TimelineStorageUtils.matchFilters(
                 entity.getConfigs(), configFilters)) {
           continue;
         }
         if (metricFilters != null && !metricFilters.isEmpty() &&
-            !TimelineReaderUtils.matchMetricFilters(
+            !TimelineStorageUtils.matchMetricFilters(
                 entity.getMetrics(), metricFilters)) {
           continue;
         }
         if (eventFilters != null && !eventFilters.isEmpty() &&
-            !TimelineReaderUtils.matchEventFilters(
+            !TimelineStorageUtils.matchEventFilters(
                 entity.getEvents(), eventFilters)) {
           continue;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
index 42079d7..c18966f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java
@@ -44,8 +44,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
@@ -220,7 +219,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) {
       readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true);
-      if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations(
+      if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations(
           entity.getIsRelatedToEntities(), isRelatedTo)) {
         return null;
       }
@@ -235,7 +234,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) {
       readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false);
-      if (checkRelatesTo && !TimelineReaderUtils.matchRelations(
+      if (checkRelatesTo && !TimelineStorageUtils.matchRelations(
           entity.getRelatesToEntities(), relatesTo)) {
         return null;
       }
@@ -251,7 +250,7 @@ class GenericEntityReader extends TimelineEntityReader {
         fieldsToRetrieve.contains(Field.INFO) || checkInfo) {
       readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false);
       if (checkInfo &&
-          !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) {
+          !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) {
         return null;
       }
       if (!fieldsToRetrieve.contains(Field.ALL) &&
@@ -265,7 +264,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) {
       readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true);
-      if (checkConfigs && !TimelineReaderUtils.matchFilters(
+      if (checkConfigs && !TimelineStorageUtils.matchFilters(
           entity.getConfigs(), configFilters)) {
         return null;
       }
@@ -280,7 +279,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) {
       readEvents(entity, result, false);
-      if (checkEvents && !TimelineReaderUtils.matchEventFilters(
+      if (checkEvents && !TimelineStorageUtils.matchEventFilters(
           entity.getEvents(), eventFilters)) {
         return null;
       }
@@ -295,7 +294,7 @@ class GenericEntityReader extends TimelineEntityReader {
     if (fieldsToRetrieve.contains(Field.ALL) ||
         fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) {
       readMetrics(entity, result, EntityColumnPrefix.METRIC);
-      if (checkMetrics && !TimelineReaderUtils.matchMetricFilters(
+      if (checkMetrics && !TimelineStorageUtils.matchMetricFilters(
           entity.getMetrics(), metricFilters)) {
         return null;
       }
@@ -365,7 +364,7 @@ class GenericEntityReader extends TimelineEntityReader {
       // the column name is of the form "eventId=timestamp=infoKey"
       if (karr.length == 3) {
         String id = Bytes.toString(karr[0]);
-        long ts = TimelineWriterUtils.invert(Bytes.toLong(karr[1]));
+        long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1]));
         String key = Separator.VALUES.joinEncoded(id, Long.toString(ts));
         TimelineEvent event = eventsMap.get(key);
         if (event == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 7c4a5da..3649865 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow
 import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -125,7 +125,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
       // if the entity is the application, the destination is the application
       // table
-      boolean isApplication = TimelineWriterUtils.isApplicationEntity(te);
+      boolean isApplication = TimelineStorageUtils.isApplicationEntity(te);
       byte[] rowKey = isApplication ?
           ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId,
               appId) :
@@ -139,7 +139,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeRelations(rowKey, te, isApplication);
 
       if (isApplication) {
-        if (TimelineWriterUtils.isApplicationCreated(te)) {
+        if (TimelineStorageUtils.isApplicationCreated(te)) {
           onApplicationCreated(clusterId, userId, flowName, flowVersion,
               flowRunId, appId, te);
         }
@@ -149,7 +149,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         // if application has finished, store it's finish time and write final
         // values
         // of all metrics
-        if (TimelineWriterUtils.isApplicationFinished(te)) {
+        if (TimelineStorageUtils.isApplicationFinished(te)) {
           onApplicationFinished(clusterId, userId, flowName, flowVersion,
               flowRunId, appId, te);
         }
@@ -234,7 +234,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
     Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
         .getAttribute(appId);
     FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
-        TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId);
+        TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId);
 
     // store the final value of metrics since application has finished
     Set<TimelineMetric> metrics = te.getMetrics();
@@ -406,9 +406,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
             }
             byte[] columnQualifierFirst =
                 Bytes.toBytes(Separator.VALUES.encode(eventId));
-            byte[] columnQualifierWithTsBytes =
-                Separator.VALUES.join(columnQualifierFirst,
-                    Bytes.toBytes(TimelineWriterUtils.invert(eventTimestamp)));
+            byte[] columnQualifierWithTsBytes = Separator.VALUES.
+                join(columnQualifierFirst, Bytes.toBytes(
+                    TimelineStorageUtils.invertLong(eventTimestamp)));
             Map<String, Object> eventInfo = event.getInfo();
             if ((eventInfo == null) || (eventInfo.size() == 0)) {
               // add separator since event key is empty
@@ -418,11 +418,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
               if (isApplication) {
                 ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable,
                     compoundColumnQualifierBytes, null,
-                      TimelineWriterUtils.EMPTY_BYTES);
+                      TimelineStorageUtils.EMPTY_BYTES);
               } else {
                 EntityColumnPrefix.EVENT.store(rowKey, entityTable,
                     compoundColumnQualifierBytes, null,
-                    TimelineWriterUtils.EMPTY_BYTES);
+                      TimelineStorageUtils.EMPTY_BYTES);
               }
             } else {
               for (Map.Entry<String, Object> info : eventInfo.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
index 10e3c2e..1cf6145 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the application table.
@@ -90,7 +90,7 @@ public class ApplicationRowKey {
       String flowId, Long flowRunId) {
     byte[] first = Bytes.toBytes(
         Separator.QUALIFIERS.joinEncoded(clusterId, userId, flowId));
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
     return Separator.QUALIFIERS.join(first, second, new byte[0]);
   }
 
@@ -112,8 +112,8 @@ public class ApplicationRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third = Bytes.toBytes(appId);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
     return Separator.QUALIFIERS.join(first, second, third);
   }
 
@@ -135,9 +135,8 @@ public class ApplicationRowKey {
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
-    String appId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
     return new ApplicationRowKey(clusterId, userId, flowId, flowRunId, appId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
index ca88056..133952e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the app_flow table.
@@ -49,7 +50,9 @@ public class AppToFlowRowKey {
    * @return byte array with the row key
    */
   public static byte[] getRowKey(String clusterId, String appId) {
-    return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+    byte[] first = Bytes.toBytes(clusterId);
+    byte[] second = TimelineStorageUtils.encodeAppId(appId);
+    return Separator.QUALIFIERS.join(first, second);
   }
 
   /**
@@ -64,7 +67,7 @@ public class AppToFlowRowKey {
     }
 
     String clusterId = Bytes.toString(rowKeyComponents[0]);
-    String appId = Bytes.toString(rowKeyComponents[1]);
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[1]);
     return new AppToFlowRowKey(clusterId, appId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
index 9f91af8..1e82494 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Separator.java
@@ -304,7 +304,7 @@ public enum Separator {
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source, int limit) {
-    return TimelineWriterUtils.split(source, this.bytes, limit);
+    return TimelineStorageUtils.split(source, this.bytes, limit);
   }
 
   /**
@@ -315,6 +315,6 @@ public enum Separator {
    * @return source split by this separator.
    */
   public byte[][] split(byte[] source) {
-    return TimelineWriterUtils.split(source, this.bytes);
+    return TimelineStorageUtils.split(source, this.bytes);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
deleted file mode 100644
index 91d7ba4..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-
-public class TimelineReaderUtils {
-  /**
-   *
-   * @param entityRelations the relations of an entity
-   * @param relationFilters the relations for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchRelations(
-      Map<String, Set<String>> entityRelations,
-      Map<String, Set<String>> relationFilters) {
-    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
-      Set<String> ids = entityRelations.get(relation.getKey());
-      if (ids == null) {
-        return false;
-      }
-      for (String id : relation.getValue()) {
-        if (!ids.contains(id)) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param map the map of key/value pairs in an entity
-   * @param filters the map of key/value pairs for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchFilters(Map<String, ? extends Object> map,
-      Map<String, ? extends Object> filters) {
-    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
-      Object value = map.get(filter.getKey());
-      if (value == null) {
-        return false;
-      }
-      if (!value.equals(filter.getValue())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param entityEvents the set of event objects in an entity
-   * @param eventFilters the set of event Ids for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
-      Set<String> eventFilters) {
-    Set<String> eventIds = new HashSet<String>();
-    for (TimelineEvent event : entityEvents) {
-      eventIds.add(event.getId());
-    }
-    for (String eventFilter : eventFilters) {
-      if (!eventIds.contains(eventFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   *
-   * @param metrics the set of metric objects in an entity
-   * @param metricFilters the set of metric Ids for filtering
-   * @return a boolean flag to indicate if both match
-   */
-  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
-      Set<String> metricFilters) {
-    Set<String> metricIds = new HashSet<String>();
-    for (TimelineMetric metric : metrics) {
-      metricIds.add(metric.getId());
-    }
-
-    for (String metricFilter : metricFilters) {
-      if (!metricIds.contains(metricFilter)) {
-        return false;
-      }
-    }
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
new file mode 100644
index 0000000..c1aaf19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+/**
+ * A bunch of utility functions used across TimelineReader and TimelineWriter.
+ */
+@Public
+@Unstable
+public class TimelineStorageUtils {
+
+  /** empty bytes */
+  public static final byte[] EMPTY_BYTES = new byte[0];
+
+  /** indicator for no limits for splitting */
+  public static final int NO_LIMIT_SPLIT = -1;
+
+  /** milliseconds in one day */
+  public static final long MILLIS_ONE_DAY = 86400000L;
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @return byte[] array after splitting the source
+   */
+  public static byte[][] split(byte[] source, byte[] separator) {
+    return split(source, separator, NO_LIMIT_SPLIT);
+  }
+
+  /**
+   * Splits the source array into multiple array segments using the given
+   * separator, up to a maximum of count items. This will naturally produce
+   * copied byte arrays for each of the split segments. To identify the split
+   * ranges without the array copies, see {@link #splitRanges(byte[], byte[])}.
+   *
+   * @param source
+   * @param separator
+   * @param limit a non-positive value indicates no limit on number of segments.
+   * @return byte[][] after splitting the input source
+   */
+  public static byte[][] split(byte[] source, byte[] separator, int limit) {
+    List<Range> segments = splitRanges(source, separator, limit);
+
+    byte[][] splits = new byte[segments.size()][];
+    for (int i = 0; i < segments.size(); i++) {
+      Range r = segments.get(i);
+      byte[] tmp = new byte[r.length()];
+      if (tmp.length > 0) {
+        System.arraycopy(source, r.start(), tmp, 0, r.length());
+      }
+      splits[i] = tmp;
+    }
+    return splits;
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator) {
+    return splitRanges(source, separator, NO_LIMIT_SPLIT);
+  }
+
+  /**
+   * Returns a list of ranges identifying [start, end) -- closed, open --
+   * positions within the source byte array that would be split using the
+   * separator byte array.
+   *
+   * @param source the source data
+   * @param separator the separator pattern to look for
+   * @param limit the maximum number of splits to identify in the source
+   */
+  public static List<Range> splitRanges(byte[] source, byte[] separator,
+      int limit) {
+    List<Range> segments = new ArrayList<Range>();
+    if ((source == null) || (separator == null)) {
+      return segments;
+    }
+    int start = 0;
+    itersource: for (int i = 0; i < source.length; i++) {
+      for (int j = 0; j < separator.length; j++) {
+        if (source[i + j] != separator[j]) {
+          continue itersource;
+        }
+      }
+      // all separator elements matched
+      if (limit > 0 && segments.size() >= (limit - 1)) {
+        // everything else goes in one final segment
+        break;
+      }
+	      segments.add(new Range(start, i));
+      start = i + separator.length;
+      // i will be incremented again in outer for loop
+      i += separator.length - 1;
+    }
+    // add in remaining to a final range
+    if (start <= source.length) {
+      segments.add(new Range(start, source.length));
+    }
+    return segments;
+  }
+
+  /**
+   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
+   * where we want to have the most recent timestamp in the top of the table
+   * (scans start at the most recent timestamp first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted long
+   */
+  public static long invertLong(long key) {
+    return Long.MAX_VALUE - key;
+  }
+
+  /**
+   * Converts an int into it's inverse int to be used in (row) keys
+   * where we want to have the largest int value in the top of the table
+   * (scans start at the largest int first).
+   *
+   * @param key value to be inverted so that the latest version will be first in
+   *          a scan.
+   * @return inverted int
+   */
+  public static int invertInt(int key) {
+    return Integer.MAX_VALUE - key;
+  }
+
+
+  /**
+   * Converts/encodes a string app Id into a byte representation for (row) keys.
+   * For conversion, we extract cluster timestamp and sequence id from the
+   * string app id (calls {@link ConverterUtils#toApplicationId(String)} for
+   * conversion) and then store it in a byte array of length 12 (8 bytes (long)
+   * for cluster timestamp followed 4 bytes(int) for sequence id). Both cluster
+   * timestamp and sequence id are inverted so that the most recent cluster
+   * timestamp and highest sequence id appears first in the table (i.e.
+   * application id appears in a descending order).
+   *
+   * @param appIdStr application id in string format i.e.
+   * application_{cluster timestamp}_{sequence id with min 4 digits}
+   *
+   * @return encoded byte representation of app id.
+   */
+  public static byte[] encodeAppId(String appIdStr) {
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    byte[] appIdBytes = new byte[Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT];
+    byte[] clusterTs = Bytes.toBytes(invertLong(appId.getClusterTimestamp()));
+    System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG);
+    byte[] seqId = Bytes.toBytes(invertInt(appId.getId()));
+    System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT);
+    return appIdBytes;
+  }
+
+  /**
+   * Converts/decodes a 12 byte representation of app id for (row) keys to an
+   * app id in string format which can be returned back to client.
+   * For decoding, 12 bytes are interpreted as 8 bytes of inverted cluster
+   * timestamp(long) followed by 4 bytes of inverted sequence id(int). Calls
+   * {@link ApplicationId#toString} to generate string representation of app id.
+   *
+   * @param appIdBytes application id in byte representation.
+   *
+   * @return decoded app id in string format.
+   */
+  public static String decodeAppId(byte[] appIdBytes) {
+    if (appIdBytes.length != (Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT)) {
+      throw new IllegalArgumentException("Invalid app id in byte format");
+    }
+    long clusterTs = invertLong(Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG));
+    int seqId =
+        invertInt(Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT));
+    return ApplicationId.newInstance(clusterTs, seqId).toString();
+  }
+
+  /**
+   * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
+   * for a given input timestamp
+   *
+   * @param ts
+   * @return timestamp of that day's beginning (midnight)
+   */
+  public static long getTopOfTheDayTimestamp(long ts) {
+    long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
+    return dayTimestamp;
+  }
+
+  /**
+   * Combines the input array of attributes and the input aggregation operation
+   * into a new array of attributes.
+   *
+   * @param attributes
+   * @param aggOp
+   * @return array of combined attributes
+   */
+  public static Attribute[] combineAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
+    Attribute[] combinedAttributes = new Attribute[newLength];
+
+    if (attributes != null) {
+      System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
+    }
+
+    if (aggOp != null) {
+      Attribute a2 = aggOp.getAttribute();
+      combinedAttributes[newLength - 1] = a2;
+    }
+    return combinedAttributes;
+  }
+
+  /**
+   * Returns a number for the new array size. The new array is the combination
+   * of input array of attributes and the input aggregation operation.
+   *
+   * @param attributes
+   * @param aggOp
+   * @return the size for the new array
+   */
+  private static int getNewLengthCombinedAttributes(Attribute[] attributes,
+      AggregationOperation aggOp) {
+    int oldLength = getAttributesLength(attributes);
+    int aggLength = getAppOpLength(aggOp);
+    return oldLength + aggLength;
+  }
+
+  private static int getAppOpLength(AggregationOperation aggOp) {
+    if (aggOp != null) {
+      return 1;
+    }
+    return 0;
+  }
+
+  private static int getAttributesLength(Attribute[] attributes) {
+    if (attributes != null) {
+      return attributes.length;
+    }
+    return 0;
+  }
+
+  /**
+   * checks if an application has finished
+   *
+   * @param te
+   * @return true if application has finished else false
+   */
+  public static boolean isApplicationFinished(TimelineEntity te) {
+    SortedSet<TimelineEvent> allEvents = te.getEvents();
+    if ((allEvents != null) && (allEvents.size() > 0)) {
+      TimelineEvent event = allEvents.last();
+      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * get the time at which an app finished
+   *
+   * @param te
+   * @return true if application has finished else false
+   */
+  public static long getApplicationFinishedTime(TimelineEntity te) {
+    SortedSet<TimelineEvent> allEvents = te.getEvents();
+    if ((allEvents != null) && (allEvents.size() > 0)) {
+      TimelineEvent event = allEvents.last();
+      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
+        return event.getTimestamp();
+      }
+    }
+    return 0l;
+  }
+
+  /**
+   * Checks if the input TimelineEntity object is an ApplicationEntity.
+   *
+   * @param te
+   * @return true if input is an ApplicationEntity, false otherwise
+   */
+  public static boolean isApplicationEntity(TimelineEntity te) {
+    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
+  }
+
+  /**
+   * Checks for the APPLICATION_CREATED event.
+   *
+   * @param te
+   * @return true is application event exists, false otherwise
+   */
+  public static boolean isApplicationCreated(TimelineEntity te) {
+    if (isApplicationEntity(te)) {
+      for (TimelineEvent event : te.getEvents()) {
+        if (event.getId()
+            .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns the first seen aggregation operation as seen in the list of input
+   * tags or null otherwise
+   *
+   * @param tags
+   * @return AggregationOperation
+   */
+  public static AggregationOperation getAggregationOperationFromTagsList(
+      List<Tag> tags) {
+    for (AggregationOperation aggOp : AggregationOperation.values()) {
+      for (Tag tag : tags) {
+        if (tag.getType() == aggOp.getTagType()) {
+          return aggOp;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Creates a {@link Tag} from the input attribute.
+   *
+   * @param attribute
+   * @return Tag
+   */
+  public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
+    // attribute could be either an Aggregation Operation or
+    // an Aggregation Dimension
+    // Get the Tag type from either
+    AggregationOperation aggOp = AggregationOperation
+        .getAggregationOperation(attribute.getKey());
+    if (aggOp != null) {
+      Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+      return t;
+    }
+
+    AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
+        .getAggregationCompactionDimension(attribute.getKey());
+    if (aggCompactDim != null) {
+      Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+      return t;
+    }
+    return null;
+  }
+
+  /**
+   *
+   * @param entityRelations the relations of an entity
+   * @param relationFilters the relations for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchRelations(
+      Map<String, Set<String>> entityRelations,
+      Map<String, Set<String>> relationFilters) {
+    for (Map.Entry<String, Set<String>> relation : relationFilters.entrySet()) {
+      Set<String> ids = entityRelations.get(relation.getKey());
+      if (ids == null) {
+        return false;
+      }
+      for (String id : relation.getValue()) {
+        if (!ids.contains(id)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param map the map of key/value pairs in an entity
+   * @param filters the map of key/value pairs for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchFilters(Map<String, ? extends Object> map,
+      Map<String, ? extends Object> filters) {
+    for (Map.Entry<String, ? extends Object> filter : filters.entrySet()) {
+      Object value = map.get(filter.getKey());
+      if (value == null) {
+        return false;
+      }
+      if (!value.equals(filter.getValue())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param entityEvents the set of event objects in an entity
+   * @param eventFilters the set of event Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchEventFilters(Set<TimelineEvent> entityEvents,
+      Set<String> eventFilters) {
+    Set<String> eventIds = new HashSet<String>();
+    for (TimelineEvent event : entityEvents) {
+      eventIds.add(event.getId());
+    }
+    for (String eventFilter : eventFilters) {
+      if (!eventIds.contains(eventFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   *
+   * @param metrics the set of metric objects in an entity
+   * @param metricFilters the set of metric Ids for filtering
+   * @return a boolean flag to indicate if both match
+   */
+  public static boolean matchMetricFilters(Set<TimelineMetric> metrics,
+      Set<String> metricFilters) {
+    Set<String> metricIds = new HashSet<String>();
+    for (TimelineMetric metric : metrics) {
+      metricIds.add(metric.getId());
+    }
+
+    for (String metricFilter : metricFilters) {
+      if (!metricIds.contains(metricFilter)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
deleted file mode 100644
index 371371a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
-
-/**
- * bunch of utility functions used across TimelineWriter classes
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineWriterUtils {
-
-  /** empty bytes */
-  public static final byte[] EMPTY_BYTES = new byte[0];
-
-  /** indicator for no limits for splitting */
-  public static final int NO_LIMIT_SPLIT = -1;
-
-  /** milliseconds in one day */
-  public static final long MILLIS_ONE_DAY = 86400000L;
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
-   *
-   * @param source
-   * @param separator
-   * @return byte[] array after splitting the source
-   */
-  public static byte[][] split(byte[] source, byte[] separator) {
-    return split(source, separator, NO_LIMIT_SPLIT);
-  }
-
-  /**
-   * Splits the source array into multiple array segments using the given
-   * separator, up to a maximum of count items. This will naturally produce
-   * copied byte arrays for each of the split segments. To identify the split
-   * ranges without the array copies, see
-   * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}.
-   *
-   * @param source
-   * @param separator
-   * @param limit a non-positive value indicates no limit on number of segments.
-   * @return byte[][] after splitting the input source
-   */
-  public static byte[][] split(byte[] source, byte[] separator, int limit) {
-    List<Range> segments = splitRanges(source, separator, limit);
-
-    byte[][] splits = new byte[segments.size()][];
-    for (int i = 0; i < segments.size(); i++) {
-      Range r = segments.get(i);
-      byte[] tmp = new byte[r.length()];
-      if (tmp.length > 0) {
-        System.arraycopy(source, r.start(), tmp, 0, r.length());
-      }
-      splits[i] = tmp;
-    }
-    return splits;
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator) {
-    return splitRanges(source, separator, NO_LIMIT_SPLIT);
-  }
-
-  /**
-   * Returns a list of ranges identifying [start, end) -- closed, open --
-   * positions within the source byte array that would be split using the
-   * separator byte array.
-   *
-   * @param source the source data
-   * @param separator the separator pattern to look for
-   * @param limit the maximum number of splits to identify in the source
-   */
-  public static List<Range> splitRanges(byte[] source, byte[] separator,
-      int limit) {
-    List<Range> segments = new ArrayList<Range>();
-    if ((source == null) || (separator == null)) {
-      return segments;
-    }
-    int start = 0;
-    itersource: for (int i = 0; i < source.length; i++) {
-      for (int j = 0; j < separator.length; j++) {
-        if (source[i + j] != separator[j]) {
-          continue itersource;
-        }
-      }
-      // all separator elements matched
-      if (limit > 0 && segments.size() >= (limit - 1)) {
-        // everything else goes in one final segment
-        break;
-      }
-
-      segments.add(new Range(start, i));
-      start = i + separator.length;
-      // i will be incremented again in outer for loop
-      i += separator.length - 1;
-    }
-    // add in remaining to a final range
-    if (start <= source.length) {
-      segments.add(new Range(start, source.length));
-    }
-    return segments;
-  }
-
-  /**
-   * Converts a timestamp into it's inverse timestamp to be used in (row) keys
-   * where we want to have the most recent timestamp in the top of the table
-   * (scans start at the most recent timestamp first).
-   *
-   * @param key value to be inverted so that the latest version will be first in
-   *          a scan.
-   * @return inverted long
-   */
-  public static long invert(Long key) {
-    return Long.MAX_VALUE - key;
-  }
-
-  /**
-   * returns the timestamp of that day's start (which is midnight 00:00:00 AM)
-   * for a given input timestamp
-   *
-   * @param ts
-   * @return timestamp of that day's beginning (midnight)
-   */
-  public static long getTopOfTheDayTimestamp(long ts) {
-    long dayTimestamp = ts - (ts % MILLIS_ONE_DAY);
-    return dayTimestamp;
-  }
-
-  /**
-   * Combines the input array of attributes and the input aggregation operation
-   * into a new array of attributes.
-   *
-   * @param attributes
-   * @param aggOp
-   * @return array of combined attributes
-   */
-  public static Attribute[] combineAttributes(Attribute[] attributes,
-      AggregationOperation aggOp) {
-    int newLength = getNewLengthCombinedAttributes(attributes, aggOp);
-    Attribute[] combinedAttributes = new Attribute[newLength];
-
-    if (attributes != null) {
-      System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length);
-    }
-
-    if (aggOp != null) {
-      Attribute a2 = aggOp.getAttribute();
-      combinedAttributes[newLength - 1] = a2;
-    }
-    return combinedAttributes;
-  }
-
-  /**
-   * Returns a number for the new array size. The new array is the combination
-   * of input array of attributes and the input aggregation operation.
-   *
-   * @param attributes
-   * @param aggOp
-   * @return the size for the new array
-   */
-  private static int getNewLengthCombinedAttributes(Attribute[] attributes,
-      AggregationOperation aggOp) {
-    int oldLength = getAttributesLength(attributes);
-    int aggLength = getAppOpLength(aggOp);
-    return oldLength + aggLength;
-  }
-
-  private static int getAppOpLength(AggregationOperation aggOp) {
-    if (aggOp != null) {
-      return 1;
-    }
-    return 0;
-  }
-
-  private static int getAttributesLength(Attribute[] attributes) {
-    if (attributes != null) {
-      return attributes.length;
-    }
-    return 0;
-  }
-
-  /**
-   * checks if an application has finished
-   *
-   * @param te
-   * @return true if application has finished else false
-   */
-  public static boolean isApplicationFinished(TimelineEntity te) {
-    SortedSet<TimelineEvent> allEvents = te.getEvents();
-    if ((allEvents != null) && (allEvents.size() > 0)) {
-      TimelineEvent event = allEvents.last();
-      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * get the time at which an app finished
-   *
-   * @param te
-   * @return true if application has finished else false
-   */
-  public static long getApplicationFinishedTime(TimelineEntity te) {
-    SortedSet<TimelineEvent> allEvents = te.getEvents();
-    if ((allEvents != null) && (allEvents.size() > 0)) {
-      TimelineEvent event = allEvents.last();
-      if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
-        return event.getTimestamp();
-      }
-    }
-    return 0l;
-  }
-
-  /**
-   * Checks if the input TimelineEntity object is an ApplicationEntity.
-   *
-   * @param te
-   * @return true if input is an ApplicationEntity, false otherwise
-   */
-  public static boolean isApplicationEntity(TimelineEntity te) {
-    return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString());
-  }
-
-  /**
-   * Checks for the APPLICATION_CREATED event.
-   *
-   * @param te
-   * @return true is application event exists, false otherwise
-   */
-  public static boolean isApplicationCreated(TimelineEntity te) {
-    if (isApplicationEntity(te)) {
-      for (TimelineEvent event : te.getEvents()) {
-        if (event.getId()
-            .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Returns the first seen aggregation operation as seen in the list of input
-   * tags or null otherwise
-   *
-   * @param tags
-   * @return AggregationOperation
-   */
-  public static AggregationOperation getAggregationOperationFromTagsList(
-      List<Tag> tags) {
-    for (AggregationOperation aggOp : AggregationOperation.values()) {
-      for (Tag tag : tags) {
-        if (tag.getType() == aggOp.getTagType()) {
-          return aggOp;
-        }
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Creates a {@link Tag} from the input attribute.
-   *
-   * @param attribute
-   * @return Tag
-   */
-  public static Tag getTagFromAttribute(Entry<String, byte[]> attribute) {
-    // attribute could be either an Aggregation Operation or
-    // an Aggregation Dimension
-    // Get the Tag type from either
-    AggregationOperation aggOp = AggregationOperation
-        .getAggregationOperation(attribute.getKey());
-    if (aggOp != null) {
-      Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
-      return t;
-    }
-
-    AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension
-        .getAggregationCompactionDimension(attribute.getKey());
-    if (aggCompactDim != null) {
-      Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
-      return t;
-    }
-    return null;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 6a534ed73..e0413c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the entity table.
@@ -90,9 +90,9 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    return Separator.QUALIFIERS.join(first, second, third, new byte[0]);
   }
 
   /**
@@ -114,10 +114,11 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    byte[] fourth =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, ""));
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
   }
 
   /**
@@ -141,11 +142,11 @@ public class EntityRowKey {
             flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
-    byte[] third =
-        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
-            entityId));
-    return Separator.QUALIFIERS.join(first, second, third);
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
+    byte[] third = TimelineStorageUtils.encodeAppId(appId);
+    byte[] fourth =
+        Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(entityType, entityId));
+    return Separator.QUALIFIERS.join(first, second, third, fourth);
   }
 
   /**
@@ -166,9 +167,8 @@ public class EntityRowKey {
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
-    String appId =
-        Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[4]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
+    String appId = TimelineStorageUtils.decodeAppId(rowKeyComponents[4]);
     String entityType =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[5]));
     String entityId =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
index b899e5c..38c0f3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 /**
@@ -114,7 +114,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
@@ -235,7 +235,7 @@ public enum FlowActivityColumnPrefix implements ColumnPrefix<FlowActivityTable>
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, null, inputValue,
         combinedAttributes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 18ca599..f7841e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the flow activity table.
@@ -71,7 +71,7 @@ public class FlowActivityRowKey {
    */
   public static byte[] getRowKey(String clusterId, String userId,
       String flowId) {
-    long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
         .currentTimeMillis());
     return getRowKey(clusterId, dayTs, userId, flowId);
   }
@@ -90,7 +90,7 @@ public class FlowActivityRowKey {
       String flowId) {
     return Separator.QUALIFIERS.join(
         Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
-        Bytes.toBytes(TimelineWriterUtils.invert(dayTs)),
+        Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(flowId)));
   }
@@ -108,7 +108,8 @@ public class FlowActivityRowKey {
 
     String clusterId = Separator.QUALIFIERS.decode(Bytes
         .toString(rowKeyComponents[0]));
-    long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1]));
+    long dayTs =
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[1]));
     String userId = Separator.QUALIFIERS.decode(Bytes
         .toString(rowKeyComponents[2]));
     String flowId = Separator.QUALIFIERS.decode(Bytes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index ad30add..5079cc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 /**
@@ -97,7 +97,7 @@ public enum FlowRunColumn implements Column<FlowRunTable> {
       TypedBufferedMutator<FlowRunTable> tableMutator, Long timestamp,
       Object inputValue, Attribute... attributes) throws IOException {
 
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, aggOp);
     column.store(rowKey, tableMutator, columnQualifierBytes, timestamp,
         inputValue, combinedAttributes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index d55f510..b090bba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator;
 
 /**
@@ -112,7 +112,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);
@@ -140,7 +140,7 @@ public enum FlowRunColumnPrefix implements ColumnPrefix<FlowRunTable> {
 
     byte[] columnQualifier = ColumnHelper.getColumnQualifier(
         this.columnPrefixBytes, qualifier);
-    Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes(
+    Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes(
         attributes, this.aggOp);
     column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue,
         combinedAttributes);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index f743e5e..1984157 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
 
 public class FlowRunCoprocessor extends BaseRegionObserver {
@@ -89,7 +89,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver {
     List<Tag> tags = new ArrayList<>();
     if ((attributes != null) && (attributes.size() > 0)) {
       for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) {
-        Tag t = TimelineWriterUtils.getTagFromAttribute(attribute);
+        Tag t = TimelineStorageUtils.getTagFromAttribute(attribute);
         tags.add(t);
       }
       byte[] tagByteArray = Tag.fromList(tags);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
index 880d481..7ed3651 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Represents a rowkey for the flow run table.
@@ -70,7 +70,7 @@ public class FlowRunRowKey {
         userId, flowId));
     // Note that flowRunId is a long, so we can't encode them all at the same
     // time.
-    byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+    byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong(flowRunId));
     return Separator.QUALIFIERS.join(first, second);
   }
 
@@ -92,7 +92,7 @@ public class FlowRunRowKey {
     String flowId =
         Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2]));
     long flowRunId =
-        TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[3]));
+        TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3]));
     return new FlowRunRowKey(clusterId, userId, flowId, flowRunId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 651bb3a..a537891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 
 /**
  * Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -136,7 +136,7 @@ class FlowScanner implements RegionScanner, Closeable {
     // So all cells in one qualifier come one after the other before we see the
     // next column qualifier
     ByteArrayComparator comp = new ByteArrayComparator();
-    byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+    byte[] currentColumnQualifier = TimelineStorageUtils.EMPTY_BYTES;
     AggregationOperation currentAggOp = null;
     SortedSet<Cell> currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
     Set<String> alreadySeenAggDim = new HashSet<>();
@@ -163,7 +163,7 @@ class FlowScanner implements RegionScanner, Closeable {
     List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
         cell.getTagsLength());
     // We assume that all the operations for a particular column are the same
-    return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+    return TimelineStorageUtils.getAggregationOperationFromTagsList(tags);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdaa1e4e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 3b0921b..701615e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix;
@@ -366,7 +366,8 @@ public class TestHBaseTimelineStorage {
       String flow = "some_flow_name";
       String flowVersion = "AB7822C10F1111";
       long runid = 1002345678919L;
-      String appName = "some app name";
+      String appName =
+          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
       hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
       hbi.stop();
 
@@ -592,7 +593,8 @@ public class TestHBaseTimelineStorage {
         byte[][] karr = (byte[][])e.getKey();
         assertEquals(3, karr.length);
         assertEquals(eventId, Bytes.toString(karr[0]));
-        assertEquals(TimelineWriterUtils.invert(expTs), Bytes.toLong(karr[1]));
+        assertEquals(
+            TimelineStorageUtils.invertLong(expTs), Bytes.toLong(karr[1]));
         assertEquals(expKey, Bytes.toString(karr[2]));
         Object value = e.getValue();
         // there should be only one timestamp and value
@@ -667,7 +669,8 @@ public class TestHBaseTimelineStorage {
       String flow = "other_flow_name";
       String flowVersion = "1111F01C2287BA";
       long runid = 1009876543218L;
-      String appName = "some app name";
+      String appName =
+          ApplicationId.newInstance(System.currentTimeMillis(), 1).toString();
       byte[] startRow =
           EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName);
       hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
@@ -700,7 +703,7 @@ public class TestHBaseTimelineStorage {
             byte[][] karr = (byte[][])e.getKey();
             assertEquals(3, karr.length);
             assertEquals(eventId, Bytes.toString(karr[0]));
-            assertEquals(TimelineWriterUtils.invert(expTs),
+            assertEquals(TimelineStorageUtils.invertLong(expTs),
                 Bytes.toLong(karr[1]));
             // key must be empty
             assertEquals(0, karr[2].length);