You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/04/10 02:53:53 UTC

hadoop git commit: YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage. Contributed by Zhijie Shen

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 8898d14c0 -> 68c6232f8


YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage. Contributed by Zhijie Shen


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68c6232f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68c6232f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68c6232f

Branch: refs/heads/YARN-2928
Commit: 68c6232f8423e55b4d152ef3d1d66aeb2d6a555e
Parents: 8898d14
Author: Junping Du <ju...@apache.org>
Authored: Thu Apr 9 18:04:27 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Apr 9 18:04:27 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../applications/distributedshell/Client.java   | 36 +++++++++++++------
 .../distributedshell/TestDistributedShell.java  | 13 +++----
 .../yarn/util/timeline/TimelineUtils.java       | 34 +++++++++++++++---
 .../GetTimelineCollectorContextResponse.java    | 17 +++++----
 ...tTimelineCollectorContextResponsePBImpl.java | 38 +++++++++++++-------
 .../yarn_server_common_service_protos.proto     |  5 +--
 .../java/org/apache/hadoop/yarn/TestRPC.java    |  7 ++--
 .../collectormanager/NMCollectorService.java    |  2 +-
 .../containermanager/ContainerManagerImpl.java  | 18 ++++++----
 .../application/Application.java                |  6 ++--
 .../application/ApplicationImpl.java            | 27 +++++++++-----
 .../application/TestApplication.java            |  2 +-
 .../yarn/server/nodemanager/webapp/MockApp.java | 23 +++++++++---
 .../nodemanager/webapp/TestNMWebServices.java   |  2 +-
 .../server/resourcemanager/ClientRMService.java | 21 +++++++++++
 .../resourcemanager/amlauncher/AMLauncher.java  | 30 ++++++++--------
 .../TestTimelineServiceClientIntegration.java   |  2 +-
 .../collector/AppLevelTimelineCollector.java    | 10 +++---
 .../collector/TimelineCollector.java            |  4 +--
 .../collector/TimelineCollectorContext.java     | 32 +++++++++++------
 .../collector/TimelineCollectorManager.java     | 15 ++++----
 .../storage/FileSystemTimelineWriterImpl.java   | 13 +++----
 .../timelineservice/storage/TimelineWriter.java |  7 ++--
 ...TestPerNodeTimelineCollectorsAuxService.java |  2 +-
 .../collector/TestTimelineCollectorManager.java |  3 +-
 .../TestFileSystemTimelineWriterImpl.java       |  8 +++--
 27 files changed, 256 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5f20ca3..59a9165 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -50,6 +50,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3334. NM uses timeline client to publish container metrics to new
     timeline service. (Junping Du via zjshen)
 
+    YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage.
+    (Zhijie Shen via junping_du)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index e962e71..033197f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -185,8 +185,9 @@ public class Client {
   // Timeline domain writer access control
   private String modifyACLs = null;
 
-  private String flowId = null;
-  private String flowRunId = null;
+  private String flowName = null;
+  private String flowVersion = null;
+  private long flowRunId = 0L;
 
   // Command line options
   private Options opts;
@@ -289,9 +290,11 @@ public class Client {
         + "modify the timeline entities in the given domain");
     opts.addOption("create", false, "Flag to indicate whether to create the "
         + "domain specified with -domain.");
-    opts.addOption("flow", true, "ID of the flow which the distributed shell "
+    opts.addOption("flow_name", true, "Flow name which the distributed shell "
         + "app belongs to");
-    opts.addOption("flow_run", true, "ID of the flowrun which the distributed "
+    opts.addOption("flow_version", true, "Flow version which the distributed "
+        + "shell app belongs to");
+    opts.addOption("flow_run_id", true, "Flow run ID which the distributed "
         + "shell app belongs to");
     opts.addOption("help", false, "Print usage");
     opts.addOption("node_label_expression", true,
@@ -452,11 +455,19 @@ public class Client {
       }
     }
 
-    if (cliParser.hasOption("flow")) {
-      flowId = cliParser.getOptionValue("flow");
+    if (cliParser.hasOption("flow_name")) {
+      flowName = cliParser.getOptionValue("flow_name");
+    }
+    if (cliParser.hasOption("flow_version")) {
+      flowVersion = cliParser.getOptionValue("flow_version");
     }
-    if (cliParser.hasOption("flow_run")) {
-      flowRunId = cliParser.getOptionValue("flow_run");
+    if (cliParser.hasOption("flow_run_id")) {
+      try {
+        flowRunId = Long.valueOf(cliParser.getOptionValue("flow_run_id"));
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "Flow run is not a valid long value", e);
+      }
     }
     return true;
   }
@@ -550,10 +561,13 @@ public class Client {
     }
 
     Set<String> tags = new HashSet<String>();
-    if (flowId != null) {
-      tags.add(TimelineUtils.generateFlowIdTag(flowId));
+    if (flowName != null) {
+      tags.add(TimelineUtils.generateFlowNameTag(flowName));
+    }
+    if (flowVersion != null) {
+      tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
     }
-    if (flowRunId != null) {
+    if (flowRunId != 0) {
       tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
     }
     appContext.setApplicationTags(tags);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index daaad7c..cc5f5e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -237,9 +237,11 @@ public class TestDistributedShell {
       args = mergeArgs(args, timelineArgs);
       if (!defaultFlow) {
         String[] flowArgs = {
-            "--flow",
-            "test_flow_id",
-            "--flow_run",
+            "--flow_name",
+            "test_flow_name",
+            "--flow_version",
+            "test_flow_version",
+            "--flow_run_id",
             "12345678"
         };
         args = mergeArgs(args, flowArgs);
@@ -368,7 +370,8 @@ public class TestDistributedShell {
           UserGroupInformation.getCurrentUser().getShortUserName() +
           (defaultFlow ? "/" +
               TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
-              "/0/" : "/test_flow_id/12345678/") + appId.toString();
+              "/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
+              appId.toString();
       // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
       String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
 
@@ -393,8 +396,6 @@ public class TestDistributedShell {
       String containerFileName = outputDirContainer + containerTimestampFileName;
       File containerFile = new File(containerFileName);
       Assert.assertTrue(containerFile.exists());
-      String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
-          + "_";
 
       // Verify NM posting container metrics info.
       String outputDirContainerMetrics = basePath + "/" + 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index 772c92a..f198ba1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -41,7 +41,8 @@ import org.codehaus.jackson.map.ObjectMapper;
 @Evolving
 public class TimelineUtils {
 
-  public static final String FLOW_ID_TAG_PREFIX = "TIMELINE_FLOW_ID_TAG";
+  public static final String FLOW_NAME_TAG_PREFIX = "TIMELINE_FLOW_NAME_TAG";
+  public static final String FLOW_VERSION_TAG_PREFIX = "TIMELINE_FLOW_VERSION_TAG";
   public static final String FLOW_RUN_ID_TAG_PREFIX = "TIMELINE_FLOW_RUN_ID_TAG";
 
   private static ObjectMapper mapper;
@@ -114,11 +115,36 @@ public class TimelineUtils {
     return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
   }
 
-  public static String generateFlowIdTag(String flowId) {
-    return FLOW_ID_TAG_PREFIX + ":" + flowId;
+  /**
+   * Generate flow name tag
+   *
+   * @param flowName flow name that identifies a distinct flow application which
+   *                 can be run repeatedly over time
+   * @return
+   */
+  public static String generateFlowNameTag(String flowName) {
+    return FLOW_NAME_TAG_PREFIX + ":" + flowName;
+  }
+
+  /**
+   * Generate flow version tag
+   *
+   * @param flowVersion flow version that keeps track of the changes made to the
+   *                    flow
+   * @return
+   */
+  public static String generateFlowVersionTag(String flowVersion) {
+    return FLOW_VERSION_TAG_PREFIX + ":" + flowVersion;
   }
 
-  public static String generateFlowRunIdTag(String flowRunId) {
+  /**
+   * Generate flow run ID tag
+   *
+   * @param flowRunId flow run ID that identifies one instance (or specific
+   *                  execution) of that flow
+   * @return
+   */
+  public static String generateFlowRunIdTag(long flowRunId) {
     return FLOW_RUN_ID_TAG_PREFIX + ":" + flowRunId;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
index 1558e2f..bd5c11e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
@@ -23,11 +23,12 @@ import org.apache.hadoop.yarn.util.Records;
 public abstract class GetTimelineCollectorContextResponse {
 
   public static GetTimelineCollectorContextResponse newInstance(
-      String userId, String flowId, String flowRunId) {
+      String userId, String flowName, String flowVersion, long flowRunId) {
     GetTimelineCollectorContextResponse response =
         Records.newRecord(GetTimelineCollectorContextResponse.class);
     response.setUserId(userId);
-    response.setFlowId(flowId);
+    response.setFlowName(flowName);
+    response.setFlowVersion(flowVersion);
     response.setFlowRunId(flowRunId);
     return response;
   }
@@ -36,11 +37,15 @@ public abstract class GetTimelineCollectorContextResponse {
 
   public abstract void setUserId(String userId);
 
-  public abstract String getFlowId();
+  public abstract String getFlowName();
 
-  public abstract void setFlowId(String flowId);
+  public abstract void setFlowName(String flowName);
 
-  public abstract String getFlowRunId();
+  public abstract String getFlowVersion();
 
-  public abstract void setFlowRunId(String flowRunId);
+  public abstract void setFlowVersion(String flowVersion);
+
+  public abstract long getFlowRunId();
+
+  public abstract void setFlowRunId(long flowRunId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
index 6dc1f77..34713cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
@@ -102,40 +102,52 @@ public class GetTimelineCollectorContextResponsePBImpl extends
   }
 
   @Override
-  public String getFlowId() {
+  public String getFlowName() {
     GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasFlowId()) {
+    if (!p.hasFlowName()) {
       return null;
     }
-    return p.getFlowId();
+    return p.getFlowName();
   }
 
   @Override
-  public void setFlowId(String flowId) {
+  public void setFlowName(String flowName) {
     maybeInitBuilder();
-    if (flowId == null) {
-      builder.clearFlowId();
+    if (flowName == null) {
+      builder.clearFlowName();
       return;
     }
-    builder.setFlowId(flowId);
+    builder.setFlowName(flowName);
   }
 
   @Override
-  public String getFlowRunId() {
+  public String getFlowVersion() {
     GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
-    if (!p.hasFlowRunId()) {
+    if (!p.hasFlowVersion()) {
       return null;
     }
-    return p.getFlowRunId();
+    return p.getFlowVersion();
   }
 
   @Override
-  public void setFlowRunId(String flowRunId) {
+  public void setFlowVersion(String flowVersion) {
     maybeInitBuilder();
-    if (flowRunId == null) {
-      builder.clearFlowRunId();
+    if (flowVersion == null) {
+      builder.clearFlowVersion();
       return;
     }
+    builder.setFlowVersion(flowVersion);
+  }
+
+  @Override
+  public long getFlowRunId() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getFlowRunId();
+  }
+
+  @Override
+  public void setFlowRunId(long flowRunId) {
+    maybeInitBuilder();
     builder.setFlowRunId(flowRunId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 7b85d0d..a6d5945 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -97,8 +97,9 @@ message GetTimelineCollectorContextRequestProto {
 
 message GetTimelineCollectorContextResponseProto {
   optional string user_id = 1;
-  optional string flow_id = 2;
-  optional string flow_run_id = 3;
+  optional string flow_name = 2;
+  optional string flow_version = 3;
+  optional int64 flow_run_id = 4;
 }
 
 message NMContainerStatusProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 3c9f57b..52ecd73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -176,8 +176,9 @@ public class TestRPC {
       GetTimelineCollectorContextResponse response =
           proxy.getTimelineCollectorContext(request);
       Assert.assertEquals("test_user_id", response.getUserId());
-      Assert.assertEquals("test_flow_id", response.getFlowId());
-      Assert.assertEquals("test_flow_run_id", response.getFlowRunId());
+      Assert.assertEquals("test_flow_name", response.getFlowName());
+      Assert.assertEquals("test_flow_version", response.getFlowVersion());
+      Assert.assertEquals(12345678L, response.getFlowRunId());
     } catch (YarnException | IOException e) {
       Assert.fail("RPC call failured is not expected here.");
     }
@@ -374,7 +375,7 @@ public class TestRPC {
         throws  YarnException, IOException {
       if (request.getApplicationId().getId() == 1) {
          return GetTimelineCollectorContextResponse.newInstance(
-                "test_user_id", "test_flow_id", "test_flow_run_id");
+                "test_user_id", "test_flow_name", "test_flow_version", 12345678L);
       } else {
         throw new YarnException("The application is not found.");
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
index f37be23..dc5601f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -130,6 +130,6 @@ public class NMCollectorService extends CompositeService implements
           " doesn't exist on NM.");
     }
     return GetTimelineCollectorContextResponse.newInstance(
-        app.getUser(), app.getFlowId(), app.getFlowRunId());
+        app.getUser(), app.getFlowName(), app.getFlowVersion(), app.getFlowRunId());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 6ac15a6..e3470e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -296,7 +296,7 @@ public class ContainerManagerImpl extends CompositeService implements
     LOG.info("Recovering application " + appId);
     //TODO: Recover flow and flow run ID
     ApplicationImpl app = new ApplicationImpl(
-        dispatcher, p.getUser(), null, null, appId, creds, context);
+        dispatcher, p.getUser(), null, null, 0, appId, creds, context);
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
@@ -851,12 +851,18 @@ public class ContainerManagerImpl extends CompositeService implements
     try {
       if (!serviceStopped) {
         // Create the application
-        String flowId = launchContext.getEnvironment().get(
-            TimelineUtils.FLOW_ID_TAG_PREFIX);
-        String flowRunId = launchContext.getEnvironment().get(
+        String flowName = launchContext.getEnvironment().get(
+            TimelineUtils.FLOW_NAME_TAG_PREFIX);
+        String flowVersion = launchContext.getEnvironment().get(
+            TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+        String flowRunIdStr = launchContext.getEnvironment().get(
             TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-        Application application = new ApplicationImpl(
-            dispatcher, user, flowId, flowRunId, applicationID, credentials, context);
+        long flowRunId = 0L;
+        if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+          flowRunId = Long.valueOf(flowRunIdStr);
+        }
+        Application application = new ApplicationImpl(dispatcher, user,
+            flowName, flowVersion, flowRunId, applicationID, credentials, context);
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
           LOG.info("Creating a new application reference for app " + applicationID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
index 0c95193..5de3398 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
@@ -36,9 +36,11 @@ public interface Application extends EventHandler<ApplicationEvent> {
 
   ApplicationState getApplicationState();
 
-  String getFlowId();
+  String getFlowName();
 
-  String getFlowRunId();
+  String getFlowVersion();
+
+  long getFlowRunId();
   
   TimelineClient getTimelineClient();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 72f4eea..91e3935 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -68,8 +68,9 @@ public class ApplicationImpl implements Application {
 
   final Dispatcher dispatcher;
   final String user;
-  final String flowId;
-  final String flowRunId;
+  final String flowName;
+  final String flowVersion;
+  final long flowRunId;
   final ApplicationId appId;
   final Credentials credentials;
   Map<ApplicationAccessType, String> applicationACLs;
@@ -86,12 +87,13 @@ public class ApplicationImpl implements Application {
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
-  public ApplicationImpl(Dispatcher dispatcher, String user, String flowId,
-      String flowRunId, ApplicationId appId, Credentials credentials,
-      Context context) {
+  public ApplicationImpl(Dispatcher dispatcher, String user, String flowName,
+      String flowVersion, long flowRunId, ApplicationId appId,
+      Credentials credentials, Context context) {
     this.dispatcher = dispatcher;
     this.user = user;
-    this.flowId = flowId;
+    this.flowName = flowName;
+    this.flowVersion = flowVersion;
     this.flowRunId = flowRunId;
     this.appId = appId;
     this.credentials = credentials;
@@ -518,11 +520,18 @@ public class ApplicationImpl implements Application {
     }
   }
 
-  public String getFlowId() {
-    return flowId;
+  @Override
+  public String getFlowName() {
+    return flowName;
   }
 
-  public String getFlowRunId() {
+  @Override
+  public String getFlowVersion() {
+    return flowVersion;
+  }
+
+  @Override
+  public long getFlowRunId() {
     return flowRunId;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 5303df5..3889d2e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -531,7 +531,7 @@ public class TestApplication {
       this.appId = BuilderUtils.newApplicationId(timestamp, id);
 
       app = new ApplicationImpl(
-          dispatcher, this.user, null, null, appId, null, context);
+          dispatcher, this.user, null, null, 0, appId, null, context);
       containers = new ArrayList<Container>();
       for (int i = 0; i < numContainers; i++) {
         Container container = createMockedContainer(this.appId, i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
index 2ee572b..4d1be84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
@@ -40,8 +40,9 @@ public class MockApp implements Application {
   Map<ContainerId, Container> containers = new HashMap<ContainerId, Container>();
   ApplicationState appState;
   Application app;
-  String flowId;
-  String flowRunId;
+  String flowName;
+  String flowVersion;
+  long flowRunId;
   TimelineClient timelineClient = null;
 
   public MockApp(int uniqId) {
@@ -59,6 +60,14 @@ public class MockApp implements Application {
     appState = ApplicationState.NEW;
   }
 
+  public MockApp(String user, long clusterTimeStamp, int uniqId,
+      String flowName, String flowVersion, long flowRunId) {
+    this(user, clusterTimeStamp, uniqId);
+    this.flowName = flowName;
+    this.flowVersion = flowVersion;
+    this.flowRunId = flowRunId;
+  }
+
   public void setState(ApplicationState state) {
     this.appState = state;
   }
@@ -81,11 +90,15 @@ public class MockApp implements Application {
 
   public void handle(ApplicationEvent event) {}
 
-  public String getFlowId() {
-    return flowId;
+  public String getFlowName() {
+    return flowName;
+  }
+
+  public String getFlowVersion() {
+    return flowVersion;
   }
 
-  public String getFlowRunId() {
+  public long getFlowRunId() {
     return flowRunId;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 060f552..ccd831c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -327,7 +327,7 @@ public class TestNMWebServices extends JerseyTestBase {
     final String filename = "logfile1";
     final String logMessage = "log message\n";
     nmContext.getApplications().put(appId, new ApplicationImpl(null, "user",
-        null, null, appId, null, nmContext));
+        null, null, 0, appId, null, nmContext));
     
     MockContainer container = new MockContainer(appAttemptId,
         new AsyncDispatcher(), new Configuration(), "user", appId, 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 21d70b4..8adc341 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -151,6 +151,7 @@ import org.apache.hadoop.yarn.util.UTCClock;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 
 /**
@@ -551,6 +552,26 @@ public class ClientRMService extends AbstractService implements
       throw RPCUtil.getRemoteException(ie);
     }
 
+    // Sanity check for flow run
+    String value = null;
+    try {
+      for (String tag : submissionContext.getApplicationTags()) {
+        if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
+            tag.startsWith(
+                TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
+          value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
+          Long.valueOf(value);
+        }
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("Invalid to flow run: " + value +
+          ". Flow run should be a long integer", e);
+      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
+          e.getMessage(), "ClientRMService",
+          "Exception in submitting application", applicationId);
+      throw RPCUtil.getRemoteException(e);
+    }
+
     // Check whether app has already been put into rmContext,
     // If it is, simply return the response
     if (rmContext.getRMApps().get(applicationId) != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 1a8bb9a..b5021ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -221,22 +221,9 @@ public class AMLauncher implements Runnable {
     // Set flow context info
     for (String tag :
         rmContext.getRMApps().get(applicationId).getApplicationTags()) {
-      if (tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX  + ":") ||
-          tag.startsWith(TimelineUtils.FLOW_ID_TAG_PREFIX.toLowerCase() + ":")) {
-        String value = tag.substring(
-            TimelineUtils.FLOW_ID_TAG_PREFIX.length() + 1);
-        if (!value.isEmpty()) {
-          environment.put(TimelineUtils.FLOW_ID_TAG_PREFIX, value);
-        }
-      }
-      if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX  + ":") ||
-          tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
-        String value = tag.substring(
-            TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() + 1);
-        if (!value.isEmpty()) {
-          environment.put(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, value);
-        }
-      }
+      setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
+      setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
+      setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
     }
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -256,6 +243,17 @@ public class AMLauncher implements Runnable {
     container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
   }
 
+  private static void setFlowTags(
+      Map<String, String> environment, String tagPrefix, String tag) {
+    if (tag.startsWith(tagPrefix + ":") ||
+        tag.startsWith(tagPrefix.toLowerCase() + ":")) {
+      String value = tag.substring(tagPrefix.length() + 1);
+      if (!value.isEmpty()) {
+        environment.put(tagPrefix, value);
+      }
+    }
+  }
+
   @VisibleForTesting
   protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
     Token<AMRMTokenIdentifier> amrmToken =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index c8b9625..54c806c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -97,7 +97,7 @@ public class TestTimelineServiceClientIntegration {
           mock(CollectorNodemanagerProtocol.class);
       try {
         GetTimelineCollectorContextResponse response =
-            GetTimelineCollectorContextResponse.newInstance(null, null, null);
+            GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
         when(protocol.getTimelineCollectorContext(any(
             GetTimelineCollectorContextRequest.class))).thenReturn(response);
       } catch (YarnException | IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 60ddde5..5bc70e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -54,10 +54,12 @@ public class AppLevelTimelineCollector extends TimelineCollector {
     // context info from NM.
     // Current user usually is not the app user, but keep this field non-null
     context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
-    // Use app ID to generate a default flow ID for orphan app
-    context.setFlowId(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
-    // Set the flow run ID to 0 if it's an orphan app
-    context.setFlowRunId("0");
+    // Use app ID to generate a default flow name for orphan app
+    context.setFlowName(TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId));
+    // Set the flow version to string 1 if it's an orphan app
+    context.setFlowVersion("1");
+    // Set the flow run ID to 1 if it's an orphan app
+    context.setFlowRunId(1L);
     context.setAppId(appId.toString());
     super.serviceInit(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 677feb1..f1d3d72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -100,8 +100,8 @@ public abstract class TimelineCollector extends CompositeService {
 
     TimelineCollectorContext context = getTimelineEntityContext();
     return writer.write(context.getClusterId(), context.getUserId(),
-        context.getFlowId(), context.getFlowRunId(), context.getAppId(),
-        entities);
+        context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(),
+        context.getAppId(), entities);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
index c1a10a6..6cc477f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
@@ -22,19 +22,21 @@ public class TimelineCollectorContext {
 
   private String clusterId;
   private String userId;
-  private String flowId;
-  private String flowRunId;
+  private String flowName;
+  private String flowVersion;
+  private long flowRunId;
   private String appId;
 
   public TimelineCollectorContext() {
-    this(null, null, null, null, null);
+    this(null, null, null, null, 0L, null);
   }
 
   public TimelineCollectorContext(String clusterId, String userId,
-      String flowId, String flowRunId, String appId) {
+      String flowName, String flowVersion, long flowRunId, String appId) {
     this.clusterId = clusterId;
     this.userId = userId;
-    this.flowId = flowId;
+    this.flowName = flowName;
+    this.flowVersion = flowVersion;
     this.flowRunId = flowRunId;
     this.appId = appId;
   }
@@ -55,19 +57,27 @@ public class TimelineCollectorContext {
     this.userId = userId;
   }
 
-  public String getFlowId() {
-    return flowId;
+  public String getFlowName() {
+    return flowName;
   }
 
-  public void setFlowId(String flowId) {
-    this.flowId = flowId;
+  public void setFlowName(String flowName) {
+    this.flowName = flowName;
   }
 
-  public String getFlowRunId() {
+  public String getFlowVersion() {
+    return flowVersion;
+  }
+
+  public void setFlowVersion(String flowVersion) {
+    this.flowVersion = flowVersion;
+  }
+
+  public long getFlowRunId() {
     return flowRunId;
   }
 
-  public void setFlowRunId(String flowRunId) {
+  public void setFlowRunId(long flowRunId) {
     this.flowRunId = flowRunId;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 5f23c25..9a566a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -273,12 +272,16 @@ public class TimelineCollectorManager extends CompositeService {
     if (userId != null && !userId.isEmpty()) {
       collector.getTimelineEntityContext().setUserId(userId);
     }
-    String flowId = response.getFlowId();
-    if (flowId != null && !flowId.isEmpty()) {
-      collector.getTimelineEntityContext().setFlowId(flowId);
+    String flowName = response.getFlowName();
+    if (flowName != null && !flowName.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowName(flowName);
     }
-    String flowRunId = response.getFlowRunId();
-    if (flowRunId != null && !flowRunId.isEmpty()) {
+    String flowVersion = response.getFlowVersion();
+    if (flowVersion != null && !flowVersion.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowVersion(flowVersion);
+    }
+    long flowRunId = response.getFlowRunId();
+    if (flowRunId != 0L) {
       collector.getTimelineEntityContext().setFlowRunId(flowRunId);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 41b6ac9..dd8ad06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -65,22 +65,23 @@ public class FileSystemTimelineWriterImpl extends AbstractService
 
   @Override
   public TimelineWriteResponse write(String clusterId, String userId,
-      String flowId, String flowRunId, String appId,
+      String flowName, String flowVersion, long flowRunId, String appId,
       TimelineEntities entities) throws IOException {
     TimelineWriteResponse response = new TimelineWriteResponse();
     for (TimelineEntity entity : entities.getEntities()) {
-      write(clusterId, userId, flowId, flowRunId, appId, entity, response);
+      write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
+          response);
     }
     return response;
   }
 
-  private void write(String clusterId, String userId,
-      String flowId, String flowRunId, String appId, TimelineEntity entity,
+  private void write(String clusterId, String userId, String flowName,
+      String flowVersion, long flowRun, String appId, TimelineEntity entity,
       TimelineWriteResponse response) throws IOException {
     PrintWriter out = null;
     try {
-      String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowId,
-          flowRunId, appId, entity.getType());
+      String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName,
+          flowVersion, String.valueOf(flowRun), appId, entity.getType());
       String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
       out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
       out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 492e3a9..467bcec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -41,8 +41,9 @@ public interface TimelineWriter extends Service {
    *
    * @param clusterId context cluster ID
    * @param userId context user ID
-   * @param flowId context flow ID
-   * @param flowRunId context flow run ID
+   * @param flowName context flow name
+   * @param flowVersion context flow version
+   * @param flowRunId
    * @param appId context app ID
    * @param data
    *          a {@link TimelineEntities} object.
@@ -50,7 +51,7 @@ public interface TimelineWriter extends Service {
    * @throws IOException
    */
   TimelineWriteResponse write(String clusterId, String userId,
-      String flowId, String flowRunId, String appId,
+      String flowName, String flowVersion, long flowRunId, String appId,
       TimelineEntities data) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
index 1de8d6d..abbe13a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -162,7 +162,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);
     GetTimelineCollectorContextResponse response =
-        GetTimelineCollectorContextResponse.newInstance(null, null, null);
+        GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
     try {
       when(nmCollectorService.getTimelineCollectorContext(any(
           GetTimelineCollectorContextRequest.class))).thenReturn(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
index 36bda85..c662998 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -146,7 +147,7 @@ public class TestTimelineCollectorManager {
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);
     GetTimelineCollectorContextResponse response =
-        GetTimelineCollectorContextResponse.newInstance(null, null, null);
+        GetTimelineCollectorContextResponse.newInstance(null, null, null, 0L);
     try {
       when(nmCollectorService.getTimelineCollectorContext(any(
           GetTimelineCollectorContextRequest.class))).thenReturn(response);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/68c6232f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index 407b5f6..50a9f60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -57,11 +57,13 @@ public class TestFileSystemTimelineWriterImpl {
       fsi = new FileSystemTimelineWriterImpl();
       fsi.init(new YarnConfiguration());
       fsi.start();
-      fsi.write("cluster_id", "user_id", "flow_id", "flow_run_id", "app_id", te);
+      fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L,
+          "app_id", te);
 
       String fileName = fsi.getOutputRoot() +
-          "/entities/cluster_id/user_id/flow_id/flow_run_id/app_id/" + type +
-          "/" + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+          "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" +
+          type + "/" + id +
+          FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
       Path path = Paths.get(fileName);
       File f = new File(fileName);
       assertTrue(f.exists() && !f.isDirectory());