You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by va...@apache.org on 2016/03/04 15:16:14 UTC

hadoop git commit: YARN-4700. ATS storage has one extra record each time the RM got restarted. (Naganarasimha G R via Varun Saxena)

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 d491ef080 -> 85513ea85


YARN-4700. ATS storage has one extra record each time the RM got restarted. (Naganarasimha G R via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85513ea8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85513ea8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85513ea8

Branch: refs/heads/YARN-2928
Commit: 85513ea8596d5690601123f3b780886896e5bbdf
Parents: d491ef0
Author: Varun Saxena <va...@apache.org>
Authored: Fri Mar 4 19:42:22 2016 +0530
Committer: Varun Saxena <va...@apache.org>
Committed: Fri Mar 4 19:42:22 2016 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../storage/HBaseTimelineWriterImpl.java        | 47 +++++++++--------
 .../storage/common/TimelineStorageUtils.java    | 35 +++----------
 .../storage/flow/FlowActivityRowKey.java        | 27 +++-------
 ...stTimelineReaderWebServicesHBaseStorage.java | 25 +++++----
 .../storage/flow/TestFlowDataGenerator.java     | 22 ++++----
 .../flow/TestHBaseStorageFlowActivity.java      | 53 ++++++++++++--------
 .../storage/flow/TestHBaseStorageFlowRun.java   |  4 +-
 8 files changed, 99 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4c77b67..4b7fd2c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -235,6 +235,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4409. Fix javadoc and checkstyle issues in timelineservice code (Varun
     Saxena via sjlee)
 
+    YARN-4700. ATS storage has one extra record each time the RM got restarted.
+    (Naganarasimha G R via varunsaxena)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 997b175..1afe878 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
@@ -53,11 +54,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
 import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
@@ -140,19 +141,22 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
       storeRelations(rowKey, te, isApplication);
 
       if (isApplication) {
-        if (TimelineStorageUtils.isApplicationCreated(te)) {
+        TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te,
+            ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+        if (event != null) {
           onApplicationCreated(clusterId, userId, flowName, flowVersion,
-              flowRunId, appId, te);
+              flowRunId, appId, te, event.getTimestamp());
         }
         // if it's an application entity, store metrics
         storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId,
             appId, te);
         // if application has finished, store it's finish time and write final
-        // values
-        // of all metrics
-        if (TimelineStorageUtils.isApplicationFinished(te)) {
+        // values of all metrics
+        event = TimelineStorageUtils.getApplicationEvent(te,
+            ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+        if (event != null) {
           onApplicationFinished(clusterId, userId, flowName, flowVersion,
-              flowRunId, appId, te);
+              flowRunId, appId, te, event.getTimestamp());
         }
       }
     }
@@ -161,7 +165,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
 
   private void onApplicationCreated(String clusterId, String userId,
       String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntity te) throws IOException {
+      TimelineEntity te, long appCreatedTimeStamp) throws IOException {
     // store in App to flow table
     storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te);
     // store in flow run table
@@ -169,7 +173,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
         flowRunId, appId, te);
     // store in flow activity table
     storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
-        flowRunId, appId, te);
+        flowRunId, appId, appCreatedTimeStamp);
   }
 
   /*
@@ -178,8 +182,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
    */
   private void storeInFlowActivityTable(String clusterId, String userId,
       String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntity te) throws IOException {
-    byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName);
+      long activityTimeStamp) throws IOException {
+    byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp,
+        userId, flowName);
     byte[] qualifier = GenericObjectMapper.write(flowRunId);
     FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier,
         null, flowVersion,
@@ -214,28 +219,28 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
    */
   private void onApplicationFinished(String clusterId, String userId,
       String flowName, String flowVersion, long flowRunId, String appId,
-      TimelineEntity te) throws IOException {
+      TimelineEntity te, long appFinishedTimeStamp) throws IOException {
     // store in flow run table
     storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId,
-        appId, te);
+        appId, te, appFinishedTimeStamp);
 
     // indicate in the flow activity table that the app has finished
     storeInFlowActivityTable(clusterId, userId, flowName, flowVersion,
-        flowRunId, appId, te);
+        flowRunId, appId, appFinishedTimeStamp);
   }
 
   /*
    * Update the {@link FlowRunTable} with Application Finished information
    */
   private void storeAppFinishedInFlowRunTable(String clusterId, String userId,
-      String flowName, long flowRunId, String appId, TimelineEntity te)
-      throws IOException {
-    byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
-        flowRunId);
-    Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID
-        .getAttribute(appId);
+      String flowName, long flowRunId, String appId, TimelineEntity te,
+      long appFinishedTimeStamp) throws IOException {
+    byte[] rowKey =
+        FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId);
+    Attribute attributeAppId =
+        AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId);
     FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null,
-        TimelineStorageUtils.getApplicationFinishedTime(te), attributeAppId);
+        appFinishedTimeStamp, attributeAppId);
 
     // store the final value of metrics since application has finished
     Set<TimelineMetric> metrics = te.getMetrics();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 2328bba..605dbe7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -308,24 +308,6 @@ public final class TimelineStorageUtils {
   }
 
   /**
-   * get the time at which an app finished.
-   *
-   * @param te TimelineEntity object.
-   * @return true if application has finished else false
-   */
-  public static long getApplicationFinishedTime(TimelineEntity te) {
-    SortedSet<TimelineEvent> allEvents = te.getEvents();
-    if ((allEvents != null) && (allEvents.size() > 0)) {
-      TimelineEvent event = allEvents.last();
-      if (event.getId().equals(
-          ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
-        return event.getTimestamp();
-      }
-    }
-    return 0L;
-  }
-
-  /**
    * Checks if the input TimelineEntity object is an ApplicationEntity.
    *
    * @param te TimelineEntity object.
@@ -336,21 +318,20 @@ public final class TimelineStorageUtils {
   }
 
   /**
-   * Checks for the APPLICATION_CREATED event.
-   *
    * @param te TimelineEntity object.
-   * @return true is application event exists, false otherwise
+   * @param eventId event with this id needs to be fetched
+   * @return TimelineEvent if TimelineEntity contains the desired event.
    */
-  public static boolean isApplicationCreated(TimelineEntity te) {
+  public static TimelineEvent getApplicationEvent(TimelineEntity te,
+      String eventId) {
     if (isApplicationEntity(te)) {
       for (TimelineEvent event : te.getEvents()) {
-        if (event.getId()
-            .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
-          return true;
+        if (event.getId().equals(eventId)) {
+          return event;
         }
       }
     }
-    return false;
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
index 80b3287..2726ae2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java
@@ -83,36 +83,21 @@ public class FlowActivityRowKey {
   /**
    * Constructs a row key for the flow activity table as follows:
    * {@code clusterId!dayTimestamp!user!flowName}.
-   * Will insert into current day's record in the table. Uses current time to
-   * store top of the day timestamp.
    *
    * @param clusterId Cluster Id.
-   * @param userId User Id.
-   * @param flowName Flow Name.
-   * @return byte array with the row key prefix
-   */
-  public static byte[] getRowKey(String clusterId, String userId,
-      String flowName) {
-    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
-        .currentTimeMillis());
-    return getRowKey(clusterId, dayTs, userId, flowName);
-  }
-
-  /**
-   * Constructs a row key for the flow activity table as follows:
-   * {@code clusterId!dayTimestamp!user!flowName}.
-   *
-   * @param clusterId Cluster Id.
-   * @param dayTs Top of the day timestamp.
+   * @param eventTs event's TimeStamp.
    * @param userId User Id.
    * @param flowName Flow Name.
    * @return byte array for the row key
    */
-  public static byte[] getRowKey(String clusterId, long dayTs, String userId,
+  public static byte[] getRowKey(String clusterId, long eventTs, String userId,
       String flowName) {
+    // convert it to Day's time stamp
+    eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs);
+
     return Separator.QUALIFIERS.join(
         Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)),
-        Bytes.toBytes(TimelineStorageUtils.invertLong(dayTs)),
+        Bytes.toBytes(TimelineStorageUtils.invertLong(eventTs)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(userId)),
         Bytes.toBytes(Separator.QUALIFIERS.encode(flowName)));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
index 9eaa3de..009b488 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java
@@ -62,8 +62,8 @@ import org.junit.Test;
 
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.GenericType;
 import com.sun.jersey.api.client.ClientResponse.Status;
+import com.sun.jersey.api.client.GenericType;
 import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
@@ -128,15 +128,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    Long expTs = 1436512802000L;
-    event.setTimestamp(expTs);
+    event.setTimestamp(cTime);
     String expKey = "foo_event";
     Object expVal = "test";
     event.addInfo(expKey, expVal);
     entity.addEvent(event);
     TimelineEvent event11 = new TimelineEvent();
     event11.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    expTs = 1436512802010L;
+    Long expTs = 1425019501000L;
     event11.setTimestamp(expTs);
     entity.addEvent(event11);
 
@@ -165,7 +164,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     entity1.addMetrics(metrics);
     TimelineEvent event1 = new TimelineEvent();
     event1.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event1.setTimestamp(expTs);
+    event1.setTimestamp(cTime);
     event1.addInfo(expKey, expVal);
     entity1.addEvent(event1);
     te1.addEntity(entity1);
@@ -182,7 +181,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     entity3.setCreatedTime(cTime);
     TimelineEvent event2 = new TimelineEvent();
     event2.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event2.setTimestamp(1436512802037L);
+    event2.setTimestamp(cTime);
     event2.addInfo("foo_event", "test");
     entity3.addEvent(event2);
     te3.addEntity(entity3);
@@ -196,7 +195,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
     entity4.setCreatedTime(cTime);
     TimelineEvent event4 = new TimelineEvent();
     event4.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event4.setTimestamp(1436512802037L);
+    event4.setTimestamp(cTime);
     event4.addInfo("foo_event", "test");
     entity4.addEvent(event4);
     te4.addEntity(entity4);
@@ -785,10 +784,14 @@ public class TestTimelineReaderWebServicesHBaseStorage {
       assertNotNull(entities);
       assertEquals(1, entities.size());
 
+      long firstFlowActivity =
+          TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L);
+
       DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get();
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
-          "timeline/clusters/cluster1/flows?daterange=" + fmt.format(dayTs) +
-          "-" + fmt.format(dayTs + (2*86400000L)));
+          "timeline/clusters/cluster1/flows?daterange="
+          + fmt.format(firstFlowActivity) + "-"
+          + fmt.format(dayTs));
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
       assertNotNull(entities);
@@ -810,7 +813,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows?daterange=-" +
-          fmt.format(dayTs + (2*86400000L)));
+          fmt.format(dayTs));
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
       assertNotNull(entities);
@@ -818,7 +821,7 @@ public class TestTimelineReaderWebServicesHBaseStorage {
 
       uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
           "timeline/clusters/cluster1/flows?daterange=" +
-          fmt.format(dayTs - (2*86400000L)) + "-");
+           fmt.format(firstFlowActivity) + "-");
       resp = getResponse(client, uri);
       entities = resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
       assertNotNull(entities);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index a4c06f2..d45df57 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -103,8 +103,7 @@ class TestFlowDataGenerator {
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    long cTime = 20000000000000L;
-    long mTime = 1425026901000L;
+    long cTime = 1425026901000L;
     entity.setCreatedTime(cTime);
     // add metrics
     Set<TimelineMetric> metrics = new HashSet<>();
@@ -125,8 +124,7 @@ class TestFlowDataGenerator {
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    long expTs = 1436512802000L;
-    event.setTimestamp(expTs);
+    event.setTimestamp(cTime);
     String expKey = "foo_event";
     Object expVal = "test";
     event.addInfo(expKey, expVal);
@@ -134,7 +132,8 @@ class TestFlowDataGenerator {
 
     event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    event.setTimestamp(1436512801000L);
+    long expTs = cTime + 21600000;// start time + 6hrs
+    event.setTimestamp(expTs);
     event.addInfo(expKey, expVal);
     entity.addEvent(event);
 
@@ -149,8 +148,7 @@ class TestFlowDataGenerator {
     entity.setType(type);
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    long endTs = 1439379885000L;
-    event.setTimestamp(endTs);
+    event.setTimestamp(startTs);
     String expKey = "foo_event_greater";
     String expVal = "test_app_greater";
     event.addInfo(expKey, expVal);
@@ -181,25 +179,23 @@ class TestFlowDataGenerator {
     entity.setCreatedTime(startTs);
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    event.setTimestamp(System.currentTimeMillis());
+    event.setTimestamp(startTs);
     entity.addEvent(event);
     return entity;
   }
 
 
-  static TimelineEntity getFlowApp1() {
+  static TimelineEntity getFlowApp1(long appCreatedTime) {
     TimelineEntity entity = new TimelineEntity();
     String id = "flowActivity_test";
     String type = TimelineEntityType.YARN_APPLICATION.toString();
     entity.setId(id);
     entity.setType(type);
-    long cTime = 1425016501000L;
-    entity.setCreatedTime(cTime);
+    entity.setCreatedTime(appCreatedTime);
 
     TimelineEvent event = new TimelineEvent();
     event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    long expTs = 1436512802000L;
-    event.setTimestamp(expTs);
+    event.setTimestamp(appCreatedTime);
     String expKey = "foo_event";
     Object expVal = "test";
     event.addInfo(expKey, expVal);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
index 9161902..6b23b6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -112,9 +112,9 @@ public class TestHBaseStorageFlowActivity {
     String flowVersion = "CF7022C10F1354";
     long runid = 1002345678919L;
     String appName = "application_100000000000_1111";
-    long minStartTs = 10000000000000L;
-    long greaterStartTs = 30000000000000L;
-    long endTs = 1439750690000L;
+    long minStartTs = 1424995200300L;
+    long greaterStartTs = 1424995200300L + 864000L;
+    long endTs = 1424995200300L + 86000000L;;
     TimelineEntity entityMinStartTime = TestFlowDataGenerator
         .getEntityMinStartTime(minStartTs);
 
@@ -155,7 +155,8 @@ public class TestHBaseStorageFlowActivity {
     // check in flow activity table
     Table table1 = conn.getTable(TableName
         .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
-    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    byte[] startRow =
+        FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow);
     Get g = new Get(startRow);
     Result r1 = table1.get(g);
     assertNotNull(r1);
@@ -169,8 +170,7 @@ public class TestHBaseStorageFlowActivity {
     assertEquals(cluster, flowActivityRowKey.getClusterId());
     assertEquals(user, flowActivityRowKey.getUserId());
     assertEquals(flow, flowActivityRowKey.getFlowName());
-    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
-        .currentTimeMillis());
+    long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs);
     assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
     assertEquals(1, values.size());
     checkFlowActivityRunId(runid, flowVersion, values);
@@ -216,7 +216,9 @@ public class TestHBaseStorageFlowActivity {
     long runid = 1001111178919L;
 
     TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+    long appCreatedTime = 1425016501000L;
+    TimelineEntity entityApp1 =
+        TestFlowDataGenerator.getFlowApp1(appCreatedTime);
     te.addEntity(entityApp1);
 
     HBaseTimelineWriterImpl hbi = null;
@@ -231,7 +233,8 @@ public class TestHBaseStorageFlowActivity {
       hbi.close();
     }
     // check flow activity
-    checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+    checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1,
+        appCreatedTime);
 
     // use the reader to verify the data
     HBaseTimelineReaderImpl hbr = null;
@@ -262,13 +265,16 @@ public class TestHBaseStorageFlowActivity {
   }
 
   private void checkFlowActivityTable(String cluster, String user, String flow,
-      String flowVersion, long runid, Configuration c1) throws IOException {
+      String flowVersion, long runid, Configuration c1, long appCreatedTime)
+          throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
-    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    byte[] startRow =
+        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
     s.setStartRow(startRow);
     String clusterStop = cluster + "1";
-    byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+    byte[] stopRow =
+        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
     s.setStopRow(stopRow);
     Connection conn = ConnectionFactory.createConnection(c1);
     Table table1 = conn.getTable(TableName
@@ -288,8 +294,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowName());
-      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
-          .currentTimeMillis());
+      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
       assertEquals(1, values.size());
       checkFlowActivityRunId(runid, flowVersion, values);
@@ -319,7 +324,9 @@ public class TestHBaseStorageFlowActivity {
     long runid3 = 3333333333333L;
 
     TimelineEntities te = new TimelineEntities();
-    TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+    long appCreatedTime = 1425016501000L;
+    TimelineEntity entityApp1 =
+        TestFlowDataGenerator.getFlowApp1(appCreatedTime);
     te.addEntity(entityApp1);
 
     HBaseTimelineWriterImpl hbi = null;
@@ -348,7 +355,7 @@ public class TestHBaseStorageFlowActivity {
     }
     // check flow activity
     checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
-        runid1, flowVersion2, runid2, flowVersion3, runid3);
+        runid1, flowVersion2, runid2, flowVersion3, runid3, appCreatedTime);
 
     // use the timeline reader to verify data
     HBaseTimelineReaderImpl hbr = null;
@@ -369,8 +376,8 @@ public class TestHBaseStorageFlowActivity {
         assertEquals(cluster, flowActivity.getCluster());
         assertEquals(user, flowActivity.getUser());
         assertEquals(flow, flowActivity.getFlowName());
-        long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
-            .currentTimeMillis());
+        long dayTs =
+            TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
         assertEquals(dayTs, flowActivity.getDate().getTime());
         Set<FlowRunEntity> flowRuns = flowActivity.getFlowRuns();
         assertEquals(3, flowRuns.size());
@@ -395,14 +402,17 @@ public class TestHBaseStorageFlowActivity {
 
   private void checkFlowActivityTableSeveralRuns(String cluster, String user,
       String flow, Configuration c1, String flowVersion1, long runid1,
-      String flowVersion2, long runid2, String flowVersion3, long runid3)
+      String flowVersion2, long runid2, String flowVersion3, long runid3,
+      long appCreatedTime)
       throws IOException {
     Scan s = new Scan();
     s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
-    byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+    byte[] startRow =
+        FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow);
     s.setStartRow(startRow);
     String clusterStop = cluster + "1";
-    byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+    byte[] stopRow =
+        FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow);
     s.setStopRow(stopRow);
     Connection conn = ConnectionFactory.createConnection(c1);
     Table table1 = conn.getTable(TableName
@@ -419,8 +429,7 @@ public class TestHBaseStorageFlowActivity {
       assertEquals(cluster, flowActivityRowKey.getClusterId());
       assertEquals(user, flowActivityRowKey.getUserId());
       assertEquals(flow, flowActivityRowKey.getFlowName());
-      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(System
-          .currentTimeMillis());
+      long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime);
       assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
 
       Map<byte[], byte[]> values = result

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85513ea8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 9504799..b234bfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -52,8 +52,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte
 import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
 import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -114,7 +114,7 @@ public class TestHBaseStorageFlowRun {
     String flowVersion = "CF7022C10F1354";
     long runid = 1002345678919L;
     String appName = "application_100000000000_1111";
-    long minStartTs = 10000000000000L;
+    long minStartTs = 1425026900000L;
     long greaterStartTs = 30000000000000L;
     long endTs = 1439750690000L;
     TimelineEntity entityMinStartTime = TestFlowDataGenerator