You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/12/23 14:24:18 UTC

[2/2] hadoop git commit: YARN-4234. New put APIs in TimelineClient for ats v1.5. Contributed by Xuan Gong. (cherry picked from commit 882f2f04644a13cadb93070d5545f7a4f8691fde) (cherry picked from commit 6e97a3c9686b847e6047e12b7e53a8316e2bebfa)

YARN-4234. New put APIs in TimelineClient for ats v1.5. Contributed by Xuan Gong.
(cherry picked from commit 882f2f04644a13cadb93070d5545f7a4f8691fde)
(cherry picked from commit 6e97a3c9686b847e6047e12b7e53a8316e2bebfa)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/630b637f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/630b637f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/630b637f

Branch: refs/heads/branch-2.8
Commit: 630b637ff3cc3ad087a2e0318c743cbe285744f7
Parents: 263c544
Author: Junping Du <ju...@apache.org>
Authored: Wed Dec 23 05:26:51 2015 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Wed Dec 23 05:31:16 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../records/timeline/TimelineEntityGroupId.java | 163 ++++
 .../hadoop/yarn/conf/YarnConfiguration.java     |  53 +-
 .../hadoop/yarn/client/api/TimelineClient.java  |  43 +
 .../client/api/impl/DirectTimelineWriter.java   |  66 ++
 .../api/impl/FileSystemTimelineWriter.java      | 847 +++++++++++++++++++
 .../client/api/impl/TimelineClientImpl.java     | 128 +--
 .../yarn/client/api/impl/TimelineWriter.java    | 142 ++++
 .../yarn/api/TestTimelineEntityGroupId.java     |  52 ++
 .../client/api/impl/TestTimelineClient.java     |  59 +-
 .../api/impl/TestTimelineClientForATS1_5.java   | 225 +++++
 .../webapp/TestTimelineWebServicesWithSSL.java  |  21 +-
 q                                               | 222 +++++
 13 files changed, 1931 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 57227da..9875255 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -208,6 +208,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3458. CPU resource monitoring in Windows. (Inigo Goiri via cnauroth)
 
+    YARN-4234. New put APIs in TimelineClient for ats v1.5. (Xuan Gong via
+    junping_du)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java
new file mode 100644
index 0000000..984e6e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineEntityGroupId.java
@@ -0,0 +1,163 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records.timeline;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import com.google.common.base.Splitter;
+
+/**
+ * <p><code>TimelineEntityGroupId</code> is an abstract way for
+ * timeline service users to represent “a group of related timeline data.
+ * For example, all entities that represents one data flow DAG execution
+ * can be grouped into one timeline entity group. </p>
+ */
+@Public
+@Unstable
+public class TimelineEntityGroupId implements
+    Comparable<TimelineEntityGroupId> {
+
+  private static final Splitter SPLITTER = Splitter.on('_').trimResults();
+
+  private ApplicationId applicationId;
+  private String id;
+
+  @Private
+  @Unstable
+  public static final String TIMELINE_ENTITY_GROUPID_STR_PREFIX =
+      "timelineEntityGroupId";
+
+  public TimelineEntityGroupId() {
+
+  }
+
+  public static TimelineEntityGroupId newInstance(ApplicationId applicationId,
+      String id) {
+    TimelineEntityGroupId timelineEntityGroupId =
+        new TimelineEntityGroupId();
+    timelineEntityGroupId.setApplicationId(applicationId);
+    timelineEntityGroupId.setTimelineEntityGroupId(id);
+    return timelineEntityGroupId;
+  }
+
+  /**
+   * Get the <code>ApplicationId</code> of the
+   * <code>TimelineEntityGroupId</code>.
+   *
+   * @return <code>ApplicationId</code> of the
+   *         <code>TimelineEntityGroupId</code>
+   */
+  public ApplicationId getApplicationId() {
+    return this.applicationId;
+  }
+
+  public void setApplicationId(ApplicationId appID) {
+    this.applicationId = appID;
+  }
+
+  /**
+   * Get the <code>timelineEntityGroupId</code>.
+   *
+   * @return <code>timelineEntityGroupId</code>
+   */
+  public String getTimelineEntityGroupId() {
+    return this.id;
+  }
+
+  @Private
+  @Unstable
+  protected void setTimelineEntityGroupId(String timelineEntityGroupId) {
+    this.id = timelineEntityGroupId;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getTimelineEntityGroupId().hashCode();
+    result = 31 * result + getApplicationId().hashCode();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    TimelineEntityGroupId otherObject = (TimelineEntityGroupId) obj;
+    if (!this.getApplicationId().equals(otherObject.getApplicationId())) {
+      return false;
+    }
+    if (!this.getTimelineEntityGroupId().equals(
+        otherObject.getTimelineEntityGroupId())) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int compareTo(TimelineEntityGroupId other) {
+    int compareAppIds =
+        this.getApplicationId().compareTo(other.getApplicationId());
+    if (compareAppIds == 0) {
+      return this.getTimelineEntityGroupId().compareTo(
+        other.getTimelineEntityGroupId());
+    } else {
+      return compareAppIds;
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(TIMELINE_ENTITY_GROUPID_STR_PREFIX + "_");
+    ApplicationId appId = getApplicationId();
+    sb.append(appId.getClusterTimestamp()).append("_");
+    sb.append(appId.getId()).append("_");
+    sb.append(getTimelineEntityGroupId());
+    return sb.toString();
+  }
+
+  public static TimelineEntityGroupId
+      fromString(String timelineEntityGroupIdStr) {
+    StringBuffer buf = new StringBuffer();
+    Iterator<String> it = SPLITTER.split(timelineEntityGroupIdStr).iterator();
+    if (!it.next().equals(TIMELINE_ENTITY_GROUPID_STR_PREFIX)) {
+      throw new IllegalArgumentException(
+        "Invalid TimelineEntityGroupId prefix: " + timelineEntityGroupIdStr);
+    }
+    ApplicationId appId =
+        ApplicationId.newInstance(Long.parseLong(it.next()),
+          Integer.parseInt(it.next()));
+    buf.append(it.next());
+    while (it.hasNext()) {
+      buf.append("_");
+      buf.append(it.next());
+    }
+    return TimelineEntityGroupId.newInstance(appId, buf.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 35ad12f..6299a38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1557,6 +1557,10 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
       TIMELINE_SERVICE_PREFIX + "ui-web-path.";
 
+  /** Timeline client settings */
+  public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
+      TIMELINE_SERVICE_PREFIX + "client.";
+
   /**
    * Path to war file or static content directory for this UI
    * (For pluggable UIs).
@@ -1564,6 +1568,45 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_UI_ON_DISK_PATH_PREFIX =
       TIMELINE_SERVICE_PREFIX + "ui-on-disk-path.";
 
+  /**
+   * The setting for timeline service v1.5
+   */
+  public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX =
+      TIMELINE_SERVICE_PREFIX + "entity-group-fs-store.";
+
+  public static final String TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "active-dir";
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT =
+      "/tmp/entity-file-history/active";
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "retry-policy-spec";
+  public static final String
+      DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC =
+      "2000, 500";
+
+  public static final String
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES =
+      TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "summary-entity-types";
+
+  public static final String TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS =
+      TIMELINE_SERVICE_CLIENT_PREFIX + "fd-flush-interval-secs";
+  public static final long
+      TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT = 10;
+
+  public static final String TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS =
+      TIMELINE_SERVICE_CLIENT_PREFIX + "fd-clean-interval-secs";
+  public static final long
+      TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT = 60;
+
+  public static final String TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS =
+      TIMELINE_SERVICE_CLIENT_PREFIX + "fd-retain-secs";
+  public static final long TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT =
+      5*60;
+
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service
   @Private
@@ -1604,8 +1647,8 @@ public class YarnConfiguration extends Configuration {
   public static final String FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
       APPLICATION_HISTORY_PREFIX + "fs-history-store.compression-type";
   @Private
-  public static final String DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE =
-      "none";
+  public static final String
+      DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE = "none";
 
   /** The setting that controls whether timeline service is enabled or not. */
   public static final String TIMELINE_SERVICE_ENABLED =
@@ -1654,7 +1697,7 @@ public class YarnConfiguration extends Configuration {
       APPLICATION_HISTORY_PREFIX + "max-applications";
   public static final long DEFAULT_APPLICATION_HISTORY_MAX_APPS = 10000;
 
-  /** Timeline service store class */
+  /** Timeline service store class. */
   public static final String TIMELINE_SERVICE_STORE =
       TIMELINE_SERVICE_PREFIX + "store-class";
 
@@ -1767,10 +1810,6 @@ public class YarnConfiguration extends Configuration {
   public static final boolean
       TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
 
-  /** Timeline client settings */
-  public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
-      TIMELINE_SERVICE_PREFIX + "client.";
-
   /** Timeline client call, max retries (-1 means no limit) */
   public static final String TIMELINE_SERVICE_CLIENT_MAX_RETRIES =
       TIMELINE_SERVICE_CLIENT_PREFIX + "max-retries";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index a3766f9..258b9f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -26,8 +26,10 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -80,6 +82,28 @@ public abstract class TimelineClient extends AbstractService {
 
   /**
    * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * server. It is a blocking API. The method will not return until it gets the
+   * response from the timeline server.
+   *
+   * This API is only for timeline service v1.5
+   * </p>
+   *
+   * @param appAttemptId {@link ApplicationAttemptId}
+   * @param groupId {@link TimelineEntityGroupId}
+   * @param entities
+   *          the collection of {@link TimelineEntity}
+   * @return the error information if the sent entities are not correctly stored
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  public abstract TimelinePutResponse putEntities(
+      ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+      TimelineEntity... entities) throws IOException, YarnException;
+
+  /**
+   * <p>
    * Send the information of a domain to the timeline server. It is a
    * blocking API. The method will not return until it gets the response from
    * the timeline server.
@@ -96,6 +120,25 @@ public abstract class TimelineClient extends AbstractService {
 
   /**
    * <p>
+   * Send the information of a domain to the timeline server. It is a
+   * blocking API. The method will not return until it gets the response from
+   * the timeline server.
+   *
+   * This API is only for timeline service v1.5
+   * </p>
+   *
+   * @param domain
+   *          an {@link TimelineDomain} object
+   * @param appAttemptId {@link ApplicationAttemptId}
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  public abstract void putDomain(ApplicationAttemptId appAttemptId,
+      TimelineDomain domain) throws IOException, YarnException;
+
+  /**
+   * <p>
    * Get a delegation token so as to be able to talk to the timeline server in a
    * secure way.
    * </p>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
new file mode 100644
index 0000000..abc2a28
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/DirectTimelineWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.sun.jersey.api.client.Client;
+
+/**
+ * A simple writer class for storing Timeline data into Leveldb store.
+ */
+@Private
+@Unstable
+public class DirectTimelineWriter extends TimelineWriter{
+
+  private static final Log LOG = LogFactory
+      .getLog(DirectTimelineWriter.class);
+
+  public DirectTimelineWriter(UserGroupInformation authUgi,
+      Client client, URI resURI) {
+    super(authUgi, client, resURI);
+  }
+
+  @Override
+  public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+      TimelineEntityGroupId groupId, TimelineEntity... entities)
+      throws IOException, YarnException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public void putDomain(ApplicationAttemptId appAttemptId,
+      TimelineDomain domain) throws IOException, YarnException {
+    throw new IOException("Not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
new file mode 100644
index 0000000..1c295e1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java
@@ -0,0 +1,847 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig.Feature;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import com.sun.jersey.api.client.Client;
+
+/**
+ * A simple writer class for storing Timeline data in any storage that
+ * implements a basic FileSystem interface.
+ * This writer is used for ATSv1.5.
+ */
+@Private
+@Unstable
+public class FileSystemTimelineWriter extends TimelineWriter{
+
+  private static final Log LOG = LogFactory
+      .getLog(FileSystemTimelineWriter.class);
+
+  // This is temporary solution. The configuration will be deleted once we have
+  // the FileSystem API to check whether append operation is supported or not.
+  private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+      = YarnConfiguration.TIMELINE_SERVICE_PREFIX
+          + "entity-file.fs-support-append";
+
+  // App log directory must be readable by group so server can access logs
+  // and writable by group so it can be deleted by server
+  private static final short APP_LOG_DIR_PERMISSIONS = 0770;
+  // Logs must be readable by group so server can access them
+  private static final short FILE_LOG_PERMISSIONS = 0640;
+  private static final String DOMAIN_LOG_PREFIX = "domainlog-";
+  private static final String SUMMARY_LOG_PREFIX = "summarylog-";
+  private static final String ENTITY_LOG_PREFIX = "entitylog-";
+
+  private Path activePath = null;
+  private FileSystem fs = null;
+  private Set<String> summaryEntityTypes;
+  private ObjectMapper objMapper = null;
+  private long flushIntervalSecs;
+  private long cleanIntervalSecs;
+  private long ttl;
+  private LogFDsCache logFDsCache = null;
+  private boolean isAppendSupported;
+
+  public FileSystemTimelineWriter(Configuration conf,
+      UserGroupInformation authUgi, Client client, URI resURI)
+      throws IOException {
+    super(authUgi, client, resURI);
+
+    Configuration fsConf = new Configuration(conf);
+    fsConf.setBoolean("dfs.client.retry.policy.enabled", true);
+    String retryPolicy =
+        fsConf.get(YarnConfiguration.
+            TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC,
+          YarnConfiguration.
+              DEFAULT_TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETRY_POLICY_SPEC);
+    fsConf.set("dfs.client.retry.policy.spec", retryPolicy);
+
+    activePath = new Path(fsConf.get(
+      YarnConfiguration
+          .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+      YarnConfiguration
+          .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
+
+    String scheme = activePath.toUri().getScheme();
+    if (scheme == null) {
+      scheme = FileSystem.getDefaultUri(fsConf).getScheme();
+    }
+    if (scheme != null) {
+      String disableCacheName = String.format("fs.%s.impl.disable.cache",
+          scheme);
+      fsConf.setBoolean(disableCacheName, true);
+    }
+
+    fs = activePath.getFileSystem(fsConf);
+    if (!fs.exists(activePath)) {
+      throw new IOException(activePath + " does not exist");
+    }
+
+    summaryEntityTypes = new HashSet<String>(
+        conf.getStringCollection(YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES));
+
+    flushIntervalSecs = conf.getLong(
+        YarnConfiguration
+          .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS,
+        YarnConfiguration
+          .TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS_DEFAULT);
+
+    cleanIntervalSecs = conf.getLong(
+        YarnConfiguration
+          .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS,
+        YarnConfiguration
+          .TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS_DEFAULT);
+
+    ttl = conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS,
+        YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS_DEFAULT);
+
+    logFDsCache =
+        new LogFDsCache(flushIntervalSecs, cleanIntervalSecs, ttl);
+
+    this.isAppendSupported =
+        conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
+
+    objMapper = createObjectMapper();
+
+    if (LOG.isDebugEnabled()) {
+      StringBuilder debugMSG = new StringBuilder();
+      debugMSG.append(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_FLUSH_INTERVAL_SECS
+              + "=" + flushIntervalSecs + ", " +
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_CLEAN_INTERVAL_SECS
+              + "=" + cleanIntervalSecs + ", " +
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
+              + "=" + ttl + ", " +
+          TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+              + "=" + isAppendSupported + ", " +
+          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+              + "=" + activePath);
+
+      if (summaryEntityTypes != null && !summaryEntityTypes.isEmpty()) {
+        debugMSG.append(", " + YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES
+            + " = " + summaryEntityTypes);
+      }
+      LOG.debug(debugMSG.toString());
+    }
+  }
+
+  @Override
+  public TimelinePutResponse putEntities(
+      ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+      TimelineEntity... entities) throws IOException, YarnException {
+    if (appAttemptId == null) {
+      return putEntities(entities);
+    }
+
+    List<TimelineEntity> entitiesToDBStore = new ArrayList<TimelineEntity>();
+    List<TimelineEntity> entitiesToSummaryCache
+        = new ArrayList<TimelineEntity>();
+    List<TimelineEntity> entitiesToEntityCache
+        = new ArrayList<TimelineEntity>();
+    Path attemptDir = createAttemptDir(appAttemptId);
+
+    for (TimelineEntity entity : entities) {
+      if (summaryEntityTypes.contains(entity.getEntityType())) {
+        entitiesToSummaryCache.add(entity);
+      } else {
+        if (groupId != null) {
+          entitiesToEntityCache.add(entity);
+        } else {
+          entitiesToDBStore.add(entity);
+        }
+      }
+    }
+
+    if (!entitiesToSummaryCache.isEmpty()) {
+      Path summaryLogPath =
+          new Path(attemptDir, SUMMARY_LOG_PREFIX + appAttemptId.toString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing summary log for " + appAttemptId.toString() + " to "
+            + summaryLogPath);
+      }
+      this.logFDsCache.writeSummaryEntityLogs(fs, summaryLogPath, objMapper,
+          appAttemptId, entitiesToSummaryCache, isAppendSupported);
+    }
+
+    if (!entitiesToEntityCache.isEmpty()) {
+      Path entityLogPath =
+          new Path(attemptDir, ENTITY_LOG_PREFIX + groupId.toString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing entity log for " + groupId.toString() + " to "
+            + entityLogPath);
+      }
+      this.logFDsCache.writeEntityLogs(fs, entityLogPath, objMapper,
+          appAttemptId, groupId, entitiesToEntityCache, isAppendSupported);
+    }
+
+    if (!entitiesToDBStore.isEmpty()) {
+      putEntities(entitiesToDBStore.toArray(
+          new TimelineEntity[entitiesToDBStore.size()]));
+    }
+
+    return new TimelinePutResponse();
+  }
+
+  @Override
+  public void putDomain(ApplicationAttemptId appAttemptId,
+      TimelineDomain domain) throws IOException, YarnException {
+    if (appAttemptId == null) {
+      putDomain(domain);
+    } else {
+      writeDomain(appAttemptId, domain);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (this.logFDsCache != null) {
+      this.logFDsCache.close();
+    }
+  }
+
+  private ObjectMapper createObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+    mapper.setSerializationInclusion(Inclusion.NON_NULL);
+    mapper.configure(Feature.CLOSE_CLOSEABLE, false);
+    return mapper;
+  }
+
+  private Path createAttemptDir(ApplicationAttemptId appAttemptId)
+      throws IOException {
+    Path appDir = createApplicationDir(appAttemptId.getApplicationId());
+
+    Path attemptDir = new Path(appDir, appAttemptId.toString());
+    if (!fs.exists(attemptDir)) {
+      FileSystem.mkdirs(fs, attemptDir, new FsPermission(
+          APP_LOG_DIR_PERMISSIONS));
+    }
+    return attemptDir;
+  }
+
+  private Path createApplicationDir(ApplicationId appId) throws IOException {
+    Path appDir =
+        new Path(activePath, appId.toString());
+    if (!fs.exists(appDir)) {
+      FileSystem.mkdirs(fs, appDir, new FsPermission(APP_LOG_DIR_PERMISSIONS));
+    }
+    return appDir;
+  }
+
+  private void writeDomain(ApplicationAttemptId appAttemptId,
+      TimelineDomain domain) throws IOException {
+    Path domainLogPath =
+        new Path(createAttemptDir(appAttemptId), DOMAIN_LOG_PREFIX
+            + appAttemptId.toString());
+    LOG.info("Writing domains for " + appAttemptId.toString() + " to "
+        + domainLogPath);
+    this.logFDsCache.writeDomainLog(
+        fs, domainLogPath, objMapper, domain, isAppendSupported);
+  }
+
+  private static class DomainLogFD extends LogFD {
+    public DomainLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+        boolean isAppendSupported) throws IOException {
+      super(fs, logPath, objMapper, isAppendSupported);
+    }
+
+    public void writeDomain(TimelineDomain domain)
+        throws IOException {
+      getObjectMapper().writeValue(getJsonGenerator(), domain);
+      updateLastModifiedTime(System.currentTimeMillis());
+    }
+  }
+
+  private static class EntityLogFD extends LogFD {
+    public EntityLogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+        boolean isAppendSupported) throws IOException {
+      super(fs, logPath, objMapper, isAppendSupported);
+    }
+
+    public void writeEntities(List<TimelineEntity> entities)
+        throws IOException {
+      if (writerClosed()) {
+        prepareForWrite();
+      }
+      for (TimelineEntity entity : entities) {
+        getObjectMapper().writeValue(getJsonGenerator(), entity);
+      }
+      updateLastModifiedTime(System.currentTimeMillis());
+    }
+  }
+
+  private static class LogFD {
+    private FSDataOutputStream stream;
+    private ObjectMapper objMapper;
+    private JsonGenerator jsonGenerator;
+    private long lastModifiedTime;
+    private final boolean isAppendSupported;
+    private final ReentrantLock fdLock = new ReentrantLock();
+    private final FileSystem fs;
+    private final Path logPath;
+
+    public LogFD(FileSystem fs, Path logPath, ObjectMapper objMapper,
+        boolean isAppendSupported) throws IOException {
+      this.fs = fs;
+      this.logPath = logPath;
+      this.isAppendSupported = isAppendSupported;
+      this.objMapper = objMapper;
+      prepareForWrite();
+    }
+
+    public void close() {
+      if (stream != null) {
+        IOUtils.cleanup(LOG, jsonGenerator);
+        IOUtils.cleanup(LOG, stream);
+        stream = null;
+        jsonGenerator = null;
+      }
+    }
+
+    public void flush() throws IOException {
+      if (stream != null) {
+        stream.hflush();
+      }
+    }
+
+    public long getLastModifiedTime() {
+      return this.lastModifiedTime;
+    }
+
+    protected void prepareForWrite() throws IOException{
+      this.stream = createLogFileStream(fs, logPath);
+      this.jsonGenerator = new JsonFactory().createJsonGenerator(stream);
+      this.jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+      this.lastModifiedTime = System.currentTimeMillis();
+    }
+
+    protected boolean writerClosed() {
+      return stream == null;
+    }
+
+    private FSDataOutputStream createLogFileStream(FileSystem fileSystem,
+        Path logPathToCreate)
+        throws IOException {
+      FSDataOutputStream streamToCreate;
+      if (!isAppendSupported) {
+        logPathToCreate =
+            new Path(logPathToCreate.getParent(),
+              (logPathToCreate.getName() + "_" + System.currentTimeMillis()));
+      }
+      if (!fileSystem.exists(logPathToCreate)) {
+        streamToCreate = fileSystem.create(logPathToCreate, false);
+        fileSystem.setPermission(logPathToCreate,
+            new FsPermission(FILE_LOG_PERMISSIONS));
+      } else {
+        streamToCreate = fileSystem.append(logPathToCreate);
+      }
+      return streamToCreate;
+    }
+
+    public void lock() {
+      this.fdLock.lock();
+    }
+
+    public void unlock() {
+      this.fdLock.unlock();
+    }
+
+    protected JsonGenerator getJsonGenerator() {
+      return jsonGenerator;
+    }
+
+    protected ObjectMapper getObjectMapper() {
+      return objMapper;
+    }
+
+    protected void updateLastModifiedTime(long updatedTime) {
+      this.lastModifiedTime = updatedTime;
+    }
+  }
+
+  private static class LogFDsCache implements Closeable, Flushable{
+    private DomainLogFD domainLogFD;
+    private Map<ApplicationAttemptId, EntityLogFD> summanyLogFDs;
+    private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
+        EntityLogFD>> entityLogFDs;
+    private Timer flushTimer;
+    private FlushTimerTask flushTimerTask;
+    private Timer cleanInActiveFDsTimer;
+    private CleanInActiveFDsTask cleanInActiveFDsTask;
+    private final long ttl;
+    private final ReentrantLock domainFDLocker = new ReentrantLock();
+    private final ReentrantLock summaryTableLocker = new ReentrantLock();
+    private final ReentrantLock entityTableLocker = new ReentrantLock();
+    private final ReentrantLock summaryTableCopyLocker = new ReentrantLock();
+    private final ReentrantLock entityTableCopyLocker = new ReentrantLock();
+    private volatile boolean serviceStopped = false;
+
+    public LogFDsCache(long flushIntervalSecs, long cleanIntervalSecs,
+        long ttl) {
+      domainLogFD = null;
+      summanyLogFDs = new HashMap<ApplicationAttemptId, EntityLogFD>();
+      entityLogFDs = new HashMap<ApplicationAttemptId,
+          HashMap<TimelineEntityGroupId, EntityLogFD>>();
+      this.flushTimer =
+          new Timer(LogFDsCache.class.getSimpleName() + "FlushTimer",
+            true);
+      this.flushTimerTask = new FlushTimerTask();
+      this.flushTimer.schedule(flushTimerTask, flushIntervalSecs * 1000,
+          flushIntervalSecs * 1000);
+
+      this.cleanInActiveFDsTimer =
+          new Timer(LogFDsCache.class.getSimpleName() +
+            "cleanInActiveFDsTimer", true);
+      this.cleanInActiveFDsTask = new CleanInActiveFDsTask();
+      this.cleanInActiveFDsTimer.schedule(cleanInActiveFDsTask,
+          cleanIntervalSecs * 1000, cleanIntervalSecs * 1000);
+      this.ttl = ttl * 1000;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      try {
+        this.domainFDLocker.lock();
+        if (domainLogFD != null) {
+          domainLogFD.flush();
+        }
+      } finally {
+        this.domainFDLocker.unlock();
+      }
+
+      flushSummaryFDMap(copySummaryLogFDs(summanyLogFDs));
+
+      flushEntityFDMap(copyEntityLogFDs(entityLogFDs));
+    }
+
+    private Map<ApplicationAttemptId, EntityLogFD> copySummaryLogFDs(
+        Map<ApplicationAttemptId, EntityLogFD> summanyLogFDsToCopy) {
+      try {
+        summaryTableCopyLocker.lock();
+        return new HashMap<ApplicationAttemptId, EntityLogFD>(
+            summanyLogFDsToCopy);
+      } finally {
+        summaryTableCopyLocker.unlock();
+      }
+    }
+
+    private Map<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
+        EntityLogFD>> copyEntityLogFDs(Map<ApplicationAttemptId,
+        HashMap<TimelineEntityGroupId, EntityLogFD>> entityLogFDsToCopy) {
+      try {
+        entityTableCopyLocker.lock();
+        return new HashMap<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
+            EntityLogFD>>(entityLogFDsToCopy);
+      } finally {
+        entityTableCopyLocker.unlock();
+      }
+    }
+
+    private void flushSummaryFDMap(Map<ApplicationAttemptId,
+        EntityLogFD> logFDs) throws IOException {
+      if (!logFDs.isEmpty()) {
+        for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
+            .entrySet()) {
+          EntityLogFD logFD = logFDEntry.getValue();
+          try {
+            logFD.lock();
+            logFD.flush();
+          } finally {
+            logFD.unlock();
+          }
+        }
+      }
+    }
+
+    private void flushEntityFDMap(Map<ApplicationAttemptId, HashMap<
+        TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
+      if (!logFDs.isEmpty()) {
+        for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
+                 EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
+          HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
+              = logFDMapEntry.getValue();
+          for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
+              : logFDMap.entrySet()) {
+            EntityLogFD logFD = logFDEntry.getValue();
+            try {
+              logFD.lock();
+              logFD.flush();
+            } finally {
+              logFD.unlock();
+            }
+          }
+        }
+      }
+    }
+
+    private class FlushTimerTask extends TimerTask {
+      @Override
+      public void run() {
+        try {
+          flush();
+        } catch (Exception e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(e);
+          }
+        }
+      }
+    }
+
+    private void cleanInActiveFDs() {
+      long currentTimeStamp = System.currentTimeMillis();
+      try {
+        this.domainFDLocker.lock();
+        if (domainLogFD != null) {
+          if (currentTimeStamp - domainLogFD.getLastModifiedTime() >= ttl) {
+            domainLogFD.close();
+            domainLogFD = null;
+          }
+        }
+      } finally {
+        this.domainFDLocker.unlock();
+      }
+
+      cleanInActiveSummaryFDsforMap(copySummaryLogFDs(summanyLogFDs),
+          currentTimeStamp);
+
+      cleanInActiveEntityFDsforMap(copyEntityLogFDs(entityLogFDs),
+          currentTimeStamp);
+    }
+
+    private void cleanInActiveSummaryFDsforMap(
+        Map<ApplicationAttemptId, EntityLogFD> logFDs,
+        long currentTimeStamp) {
+      if (!logFDs.isEmpty()) {
+        for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry : logFDs
+            .entrySet()) {
+          EntityLogFD logFD = logFDEntry.getValue();
+          try {
+            logFD.lock();
+            if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
+              logFD.close();
+            }
+          } finally {
+            logFD.unlock();
+          }
+        }
+      }
+    }
+
+    private void cleanInActiveEntityFDsforMap(Map<ApplicationAttemptId,
+        HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs,
+        long currentTimeStamp) {
+      if (!logFDs.isEmpty()) {
+        for (Entry<ApplicationAttemptId, HashMap<
+                 TimelineEntityGroupId, EntityLogFD>> logFDMapEntry
+                : logFDs.entrySet()) {
+          HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
+              = logFDMapEntry.getValue();
+          for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
+              : logFDMap.entrySet()) {
+            EntityLogFD logFD = logFDEntry.getValue();
+            try {
+              logFD.lock();
+              if (currentTimeStamp - logFD.getLastModifiedTime() >= ttl) {
+                logFD.close();
+              }
+            } finally {
+              logFD.unlock();
+            }
+          }
+        }
+      }
+    }
+
+    private class CleanInActiveFDsTask extends TimerTask {
+      @Override
+      public void run() {
+        try {
+          cleanInActiveFDs();
+        } catch (Exception e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+
+      serviceStopped = true;
+
+      flushTimer.cancel();
+      cleanInActiveFDsTimer.cancel();
+
+      try {
+        this.domainFDLocker.lock();
+        if (domainLogFD != null) {
+          domainLogFD.close();
+          domainLogFD = null;
+        }
+      } finally {
+        this.domainFDLocker.unlock();
+      }
+
+      closeSummaryFDs(summanyLogFDs);
+
+      closeEntityFDs(entityLogFDs);
+    }
+
+    private void closeEntityFDs(Map<ApplicationAttemptId,
+        HashMap<TimelineEntityGroupId, EntityLogFD>> logFDs) {
+      try {
+        entityTableLocker.lock();
+        if (!logFDs.isEmpty()) {
+          for (Entry<ApplicationAttemptId, HashMap<TimelineEntityGroupId,
+                   EntityLogFD>> logFDMapEntry : logFDs.entrySet()) {
+            HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap
+                = logFDMapEntry.getValue();
+            for (Entry<TimelineEntityGroupId, EntityLogFD> logFDEntry
+                : logFDMap.entrySet()) {
+              EntityLogFD logFD = logFDEntry.getValue();
+              try {
+                logFD.lock();
+                logFD.close();
+              } finally {
+                logFD.unlock();
+              }
+            }
+          }
+        }
+      } finally {
+        entityTableLocker.unlock();
+      }
+    }
+
+    private void closeSummaryFDs(
+        Map<ApplicationAttemptId, EntityLogFD> logFDs) {
+      try {
+        summaryTableLocker.lock();
+        if (!logFDs.isEmpty()) {
+          for (Entry<ApplicationAttemptId, EntityLogFD> logFDEntry
+              : logFDs.entrySet()) {
+            EntityLogFD logFD = logFDEntry.getValue();
+            try {
+              logFD.lock();
+              logFD.close();
+            } finally {
+              logFD.unlock();
+            }
+          }
+        }
+      } finally {
+        summaryTableLocker.unlock();
+      }
+    }
+
+    public void writeDomainLog(FileSystem fs, Path logPath,
+        ObjectMapper objMapper, TimelineDomain domain,
+        boolean isAppendSupported) throws IOException {
+      try {
+        this.domainFDLocker.lock();
+        if (this.domainLogFD != null) {
+          this.domainLogFD.writeDomain(domain);
+        } else {
+          this.domainLogFD =
+              new DomainLogFD(fs, logPath, objMapper, isAppendSupported);
+          this.domainLogFD.writeDomain(domain);
+        }
+      } finally {
+        this.domainFDLocker.unlock();
+      }
+    }
+
+    public void writeEntityLogs(FileSystem fs, Path entityLogPath,
+        ObjectMapper objMapper, ApplicationAttemptId appAttemptId,
+        TimelineEntityGroupId groupId, List<TimelineEntity> entitiesToEntity,
+        boolean isAppendSupported) throws IOException{
+      writeEntityLogs(fs, entityLogPath, objMapper, appAttemptId,
+          groupId, entitiesToEntity, isAppendSupported, this.entityLogFDs);
+    }
+
+    private void writeEntityLogs(FileSystem fs, Path logPath,
+        ObjectMapper objMapper, ApplicationAttemptId attemptId,
+        TimelineEntityGroupId groupId, List<TimelineEntity> entities,
+        boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
+            TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException {
+      HashMap<TimelineEntityGroupId, EntityLogFD>logMapFD
+          = logFDs.get(attemptId);
+      if (logMapFD != null) {
+        EntityLogFD logFD = logMapFD.get(groupId);
+        if (logFD != null) {
+          try {
+            logFD.lock();
+            if (serviceStopped) {
+              return;
+            }
+            logFD.writeEntities(entities);
+          } finally {
+            logFD.unlock();
+          }
+        } else {
+          createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
+              entities, isAppendSupported, logFDs);
+        }
+      } else {
+        createEntityFDandWrite(fs, logPath, objMapper, attemptId, groupId,
+            entities, isAppendSupported, logFDs);
+      }
+    }
+
+    private void createEntityFDandWrite(FileSystem fs, Path logPath,
+        ObjectMapper objMapper, ApplicationAttemptId attemptId,
+        TimelineEntityGroupId groupId, List<TimelineEntity> entities,
+        boolean isAppendSupported, Map<ApplicationAttemptId, HashMap<
+            TimelineEntityGroupId, EntityLogFD>> logFDs) throws IOException{
+      try {
+        entityTableLocker.lock();
+        if (serviceStopped) {
+          return;
+        }
+        HashMap<TimelineEntityGroupId, EntityLogFD> logFDMap =
+            logFDs.get(attemptId);
+        if (logFDMap == null) {
+          logFDMap = new HashMap<TimelineEntityGroupId, EntityLogFD>();
+        }
+        EntityLogFD logFD = logFDMap.get(groupId);
+        if (logFD == null) {
+          logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
+        }
+        try {
+          logFD.lock();
+          logFD.writeEntities(entities);
+          try {
+            entityTableCopyLocker.lock();
+            logFDMap.put(groupId, logFD);
+            logFDs.put(attemptId, logFDMap);
+          } finally {
+            entityTableCopyLocker.unlock();
+          }
+        } finally {
+          logFD.unlock();
+        }
+      } finally {
+        entityTableLocker.unlock();
+      }
+    }
+
+    public void writeSummaryEntityLogs(FileSystem fs, Path logPath,
+        ObjectMapper objMapper, ApplicationAttemptId attemptId,
+        List<TimelineEntity> entities, boolean isAppendSupported)
+        throws IOException {
+      writeSummmaryEntityLogs(fs, logPath, objMapper, attemptId, entities,
+          isAppendSupported, this.summanyLogFDs);
+    }
+
+    private void writeSummmaryEntityLogs(FileSystem fs, Path logPath,
+        ObjectMapper objMapper, ApplicationAttemptId attemptId,
+        List<TimelineEntity> entities, boolean isAppendSupported,
+        Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
+      EntityLogFD logFD = null;
+      logFD = logFDs.get(attemptId);
+      if (logFD != null) {
+        try {
+          logFD.lock();
+          if (serviceStopped) {
+            return;
+          }
+          logFD.writeEntities(entities);
+        } finally {
+          logFD.unlock();
+        }
+      } else {
+        createSummaryFDAndWrite(fs, logPath, objMapper, attemptId, entities,
+            isAppendSupported, logFDs);
+      }
+    }
+
+    private void createSummaryFDAndWrite(FileSystem fs, Path logPath,
+        ObjectMapper objMapper, ApplicationAttemptId attemptId,
+        List<TimelineEntity> entities, boolean isAppendSupported,
+        Map<ApplicationAttemptId, EntityLogFD> logFDs) throws IOException {
+      try {
+        summaryTableLocker.lock();
+        if (serviceStopped) {
+          return;
+        }
+        EntityLogFD logFD = logFDs.get(attemptId);
+        if (logFD == null) {
+          logFD = new EntityLogFD(fs, logPath, objMapper, isAppendSupported);
+        }
+        try {
+          logFD.lock();
+          logFD.writeEntities(entities);
+          try {
+            summaryTableCopyLocker.lock();
+            logFDs.put(attemptId, logFD);
+          } finally {
+            summaryTableCopyLocker.unlock();
+          }
+        } finally {
+          logFD.unlock();
+        }
+      } finally {
+        summaryTableLocker.unlock();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 019c7a5..195a661 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -33,8 +33,6 @@ import java.security.PrivilegedExceptionAction;
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
-import javax.ws.rs.core.MediaType;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -54,19 +52,19 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
 import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.codehaus.jackson.map.ObjectMapper;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -74,7 +72,6 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientRequest;
 import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.ClientConfig;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.api.client.filter.ClientFilter;
@@ -110,6 +107,9 @@ public class TimelineClientImpl extends TimelineClient {
   private URI resURI;
   private UserGroupInformation authUgi;
   private String doAsUser;
+  private Configuration configuration;
+  private float timelineServiceVersion;
+  private TimelineWriter timelineWriter;
 
   @Private
   @VisibleForTesting
@@ -254,6 +254,7 @@ public class TimelineClientImpl extends TimelineClient {
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
+    this.configuration = conf;
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     UserGroupInformation realUgi = ugi.getRealUser();
     if (realUgi != null) {
@@ -293,58 +294,48 @@ public class TimelineClientImpl extends TimelineClient {
           RESOURCE_URI_STR));
     }
     LOG.info("Timeline service address: " + resURI);
+    timelineServiceVersion =
+        conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
     super.serviceInit(conf);
   }
 
   @Override
+  protected void serviceStart() throws Exception {
+    timelineWriter = createTimelineWriter(
+        configuration, authUgi, client, resURI);
+  }
+
+  protected TimelineWriter createTimelineWriter(Configuration conf,
+      UserGroupInformation ugi, Client webClient, URI uri)
+      throws IOException {
+    if (Float.compare(this.timelineServiceVersion, 1.5f) == 0) {
+      return new FileSystemTimelineWriter(
+          conf, ugi, webClient, uri);
+    } else {
+      return new DirectTimelineWriter(ugi, webClient, uri);
+    }
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.timelineWriter != null) {
+      this.timelineWriter.close();
+    }
+    super.serviceStop();
+  }
+
+  @Override
   public TimelinePutResponse putEntities(
       TimelineEntity... entities) throws IOException, YarnException {
-    TimelineEntities entitiesContainer = new TimelineEntities();
-    for (TimelineEntity entity : entities) {
-      if (entity.getEntityId() == null || entity.getEntityType() == null) {
-        throw new YarnException("Incomplete entity without entity id/type");
-      }
-      entitiesContainer.addEntity(entity);
-    }
-    ClientResponse resp = doPosting(entitiesContainer, null);
-    return resp.getEntity(TimelinePutResponse.class);
+    return timelineWriter.putEntities(entities);
   }
 
 
   @Override
   public void putDomain(TimelineDomain domain) throws IOException,
       YarnException {
-    doPosting(domain, "domain");
-  }
-
-  private ClientResponse doPosting(final Object obj, final String path)
-      throws IOException, YarnException {
-    ClientResponse resp;
-    try {
-      resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
-        @Override
-        public ClientResponse run() throws Exception {
-          return doPostingObject(obj, path);
-        }
-      });
-    } catch (UndeclaredThrowableException e) {
-        throw new IOException(e.getCause());
-    } catch (InterruptedException ie) {
-      throw new IOException(ie);
-    }
-    if (resp == null ||
-        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
-      String msg =
-          "Failed to get the response from the timeline server.";
-      LOG.error(msg);
-      if (LOG.isDebugEnabled() && resp != null) {
-        String output = resp.getEntity(String.class);
-        LOG.debug("HTTP error code: " + resp.getStatus()
-            + " Server response : \n" + output);
-      }
-      throw new YarnException(msg);
-    }
-    return resp;
+    timelineWriter.putDomain(domain);
   }
 
   @SuppressWarnings("unchecked")
@@ -470,23 +461,6 @@ public class TimelineClientImpl extends TimelineClient {
     return connectionRetry.retryOn(tokenRetryOp);
   }
 
-  @Private
-  @VisibleForTesting
-  public ClientResponse doPostingObject(Object object, String path) {
-    WebResource webResource = client.resource(resURI);
-    if (path == null) {
-      return webResource.accept(MediaType.APPLICATION_JSON)
-          .type(MediaType.APPLICATION_JSON)
-          .post(ClientResponse.class, object);
-    } else if (path.equals("domain")) {
-      return webResource.path(path).accept(MediaType.APPLICATION_JSON)
-          .type(MediaType.APPLICATION_JSON)
-          .put(ClientResponse.class, object);
-    } else {
-      throw new YarnRuntimeException("Unknown resource type");
-    }
-  }
-
   private class TimelineURLConnectionFactory
       implements HttpURLConnectionFactory {
 
@@ -663,4 +637,34 @@ public class TimelineClientImpl extends TimelineClient {
   public UserGroupInformation getUgi() {
     return authUgi;
   }
+
+  @Override
+  public TimelinePutResponse putEntities(ApplicationAttemptId appAttemptId,
+      TimelineEntityGroupId groupId, TimelineEntity... entities)
+      throws IOException, YarnException {
+    if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
+      throw new YarnException(
+        "This API is not supported under current Timeline Service Version: "
+            + timelineServiceVersion);
+    }
+
+    return timelineWriter.putEntities(appAttemptId, groupId, entities);
+  }
+
+  @Override
+  public void putDomain(ApplicationAttemptId appAttemptId,
+      TimelineDomain domain) throws IOException, YarnException {
+    if (Float.compare(this.timelineServiceVersion, 1.5f) != 0) {
+      throw new YarnException(
+        "This API is not supported under current Timeline Service Version: "
+            + timelineServiceVersion);
+    }
+    timelineWriter.putDomain(appAttemptId, domain);
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setTimelineWriter(TimelineWriter writer) {
+    this.timelineWriter = writer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
new file mode 100644
index 0000000..c616e63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+/**
+ * Base writer class to write the Timeline data.
+ */
+@Private
+@Unstable
+public abstract class TimelineWriter {
+
+  private static final Log LOG = LogFactory
+      .getLog(TimelineWriter.class);
+
+  private UserGroupInformation authUgi;
+  private Client client;
+  private URI resURI;
+
+  public TimelineWriter(UserGroupInformation authUgi, Client client,
+      URI resURI) {
+    this.authUgi = authUgi;
+    this.client = client;
+    this.resURI = resURI;
+  }
+
+  public void close() throws Exception {
+    // DO NOTHING
+  }
+
+  public TimelinePutResponse putEntities(
+      TimelineEntity... entities) throws IOException, YarnException {
+    TimelineEntities entitiesContainer = new TimelineEntities();
+    for (TimelineEntity entity : entities) {
+      if (entity.getEntityId() == null || entity.getEntityType() == null) {
+        throw new YarnException("Incomplete entity without entity id/type");
+      }
+      entitiesContainer.addEntity(entity);
+    }
+    ClientResponse resp = doPosting(entitiesContainer, null);
+    return resp.getEntity(TimelinePutResponse.class);
+  }
+
+  public void putDomain(TimelineDomain domain) throws IOException,
+      YarnException {
+    doPosting(domain, "domain");
+  }
+
+  public abstract TimelinePutResponse putEntities(
+      ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
+      TimelineEntity... entities) throws IOException, YarnException;
+
+  public abstract void putDomain(ApplicationAttemptId appAttemptId,
+      TimelineDomain domain) throws IOException, YarnException;
+
+  private ClientResponse doPosting(final Object obj, final String path)
+      throws IOException, YarnException {
+    ClientResponse resp;
+    try {
+      resp = authUgi.doAs(new PrivilegedExceptionAction<ClientResponse>() {
+        @Override
+        public ClientResponse run() throws Exception {
+          return doPostingObject(obj, path);
+        }
+      });
+    } catch (UndeclaredThrowableException e) {
+      throw new IOException(e.getCause());
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg =
+          "Failed to get the response from the timeline server.";
+      LOG.error(msg);
+      if (LOG.isDebugEnabled() && resp != null) {
+        String output = resp.getEntity(String.class);
+        LOG.debug("HTTP error code: " + resp.getStatus()
+            + " Server response : \n" + output);
+      }
+      throw new YarnException(msg);
+    }
+    return resp;
+  }
+
+  @Private
+  @VisibleForTesting
+  public ClientResponse doPostingObject(Object object, String path) {
+    WebResource webResource = client.resource(resURI);
+    if (path == null) {
+      return webResource.accept(MediaType.APPLICATION_JSON)
+          .type(MediaType.APPLICATION_JSON)
+          .post(ClientResponse.class, object);
+    } else if (path.equals("domain")) {
+      return webResource.path(path).accept(MediaType.APPLICATION_JSON)
+          .type(MediaType.APPLICATION_JSON)
+          .put(ClientResponse.class, object);
+    } else {
+      throw new YarnRuntimeException("Unknown resource type");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
new file mode 100644
index 0000000..55b1496
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestTimelineEntityGroupId.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimelineEntityGroupId {
+
+  @Test
+  public void testTimelineEntityGroupId() {
+    ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(1234, 2);
+    TimelineEntityGroupId group1 = TimelineEntityGroupId.newInstance(appId1, "1");
+    TimelineEntityGroupId group2 = TimelineEntityGroupId.newInstance(appId1, "2");
+    TimelineEntityGroupId group3 = TimelineEntityGroupId.newInstance(appId2, "1");
+    TimelineEntityGroupId group4 = TimelineEntityGroupId.newInstance(appId1, "1");
+
+    Assert.assertTrue(group1.equals(group4));
+    Assert.assertFalse(group1.equals(group2));
+    Assert.assertFalse(group1.equals(group3));
+
+    Assert.assertTrue(group1.compareTo(group4) == 0);
+    Assert.assertTrue(group1.compareTo(group2) < 0);
+    Assert.assertTrue(group1.compareTo(group3) < 0);
+
+    Assert.assertTrue(group1.hashCode() == group4.hashCode());
+    Assert.assertFalse(group1.hashCode() == group2.hashCode());
+    Assert.assertFalse(group1.hashCode() == group3.hashCode());
+
+    Assert.assertEquals("timelineEntityGroupId_1234_1_1", group1.toString());
+    Assert.assertEquals(TimelineEntityGroupId.fromString("timelineEntityGroupId_1234_1_1"), group1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
index 4c74c61..39fc8de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -25,8 +25,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.net.ConnectException;
+import java.net.URI;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -37,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@@ -46,17 +48,20 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientHandlerException;
 import com.sun.jersey.api.client.ClientResponse;
 
 public class TestTimelineClient {
 
   private TimelineClientImpl client;
+  private TimelineWriter spyTimelineWriter;
 
   @Before
   public void setup() {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
     client = createTimelineClient(conf);
   }
 
@@ -69,7 +74,8 @@ public class TestTimelineClient {
 
   @Test
   public void testPostEntities() throws Exception {
-    mockEntityClientResponse(client, ClientResponse.Status.OK, false, false);
+    mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK,
+      false, false);
     try {
       TimelinePutResponse response = client.putEntities(generateEntity());
       Assert.assertEquals(0, response.getErrors().size());
@@ -80,7 +86,8 @@ public class TestTimelineClient {
 
   @Test
   public void testPostEntitiesWithError() throws Exception {
-    mockEntityClientResponse(client, ClientResponse.Status.OK, true, false);
+    mockEntityClientResponse(spyTimelineWriter, ClientResponse.Status.OK, true,
+      false);
     try {
       TimelinePutResponse response = client.putEntities(generateEntity());
       Assert.assertEquals(1, response.getErrors().size());
@@ -106,8 +113,8 @@ public class TestTimelineClient {
 
   @Test
   public void testPostEntitiesNoResponse() throws Exception {
-    mockEntityClientResponse(
-        client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+    mockEntityClientResponse(spyTimelineWriter,
+      ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
     try {
       client.putEntities(generateEntity());
       Assert.fail("Exception is expected");
@@ -119,7 +126,7 @@ public class TestTimelineClient {
 
   @Test
   public void testPostEntitiesConnectionRefused() throws Exception {
-    mockEntityClientResponse(client, null, false, true);
+    mockEntityClientResponse(spyTimelineWriter, null, false, true);
     try {
       client.putEntities(generateEntity());
       Assert.fail("RuntimeException is expected");
@@ -130,7 +137,7 @@ public class TestTimelineClient {
 
   @Test
   public void testPutDomain() throws Exception {
-    mockDomainClientResponse(client, ClientResponse.Status.OK, false);
+    mockDomainClientResponse(spyTimelineWriter, ClientResponse.Status.OK, false);
     try {
       client.putDomain(generateDomain());
     } catch (YarnException e) {
@@ -140,7 +147,8 @@ public class TestTimelineClient {
 
   @Test
   public void testPutDomainNoResponse() throws Exception {
-    mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false);
+    mockDomainClientResponse(spyTimelineWriter,
+        ClientResponse.Status.FORBIDDEN, false);
     try {
       client.putDomain(generateDomain());
       Assert.fail("Exception is expected");
@@ -152,7 +160,7 @@ public class TestTimelineClient {
 
   @Test
   public void testPutDomainConnectionRefused() throws Exception {
-    mockDomainClientResponse(client, null, true);
+    mockDomainClientResponse(spyTimelineWriter, null, true);
     try {
       client.putDomain(generateDomain());
       Assert.fail("RuntimeException is expected");
@@ -291,15 +299,16 @@ public class TestTimelineClient {
   }
 
   private static ClientResponse mockEntityClientResponse(
-      TimelineClientImpl client, ClientResponse.Status status,
+      TimelineWriter spyTimelineWriter, ClientResponse.Status status,
       boolean hasError, boolean hasRuntimeError) {
     ClientResponse response = mock(ClientResponse.class);
     if (hasRuntimeError) {
-      doThrow(new ClientHandlerException(new ConnectException())).when(client)
-          .doPostingObject(any(TimelineEntities.class), any(String.class));
+      doThrow(new ClientHandlerException(new ConnectException())).when(
+        spyTimelineWriter).doPostingObject(
+        any(TimelineEntities.class), any(String.class));
       return response;
     }
-    doReturn(response).when(client)
+    doReturn(response).when(spyTimelineWriter)
         .doPostingObject(any(TimelineEntities.class), any(String.class));
     when(response.getClientResponseStatus()).thenReturn(status);
     TimelinePutResponse.TimelinePutError error =
@@ -316,15 +325,16 @@ public class TestTimelineClient {
   }
 
   private static ClientResponse mockDomainClientResponse(
-      TimelineClientImpl client, ClientResponse.Status status,
+      TimelineWriter spyTimelineWriter, ClientResponse.Status status,
       boolean hasRuntimeError) {
     ClientResponse response = mock(ClientResponse.class);
     if (hasRuntimeError) {
-      doThrow(new ClientHandlerException(new ConnectException())).when(client)
-          .doPostingObject(any(TimelineDomain.class), any(String.class));
+      doThrow(new ClientHandlerException(new ConnectException())).when(
+        spyTimelineWriter).doPostingObject(any(TimelineDomain.class),
+        any(String.class));
       return response;
     }
-    doReturn(response).when(client)
+    doReturn(response).when(spyTimelineWriter)
         .doPostingObject(any(TimelineDomain.class), any(String.class));
     when(response.getClientResponseStatus()).thenReturn(status);
     return response;
@@ -365,10 +375,19 @@ public class TestTimelineClient {
     return domain;
   }
 
-  private static TimelineClientImpl createTimelineClient(
+  private TimelineClientImpl createTimelineClient(
       YarnConfiguration conf) {
-    TimelineClientImpl client =
-        spy((TimelineClientImpl) TimelineClient.createTimelineClient());
+    TimelineClientImpl client = new TimelineClientImpl() {
+      @Override
+      protected TimelineWriter createTimelineWriter(Configuration conf,
+          UserGroupInformation authUgi, Client client, URI resURI)
+          throws IOException {
+        TimelineWriter timelineWriter =
+            new DirectTimelineWriter(authUgi, client, resURI);
+        spyTimelineWriter = spy(timelineWriter);
+        return spyTimelineWriter;
+      }
+    };
     client.init(conf);
     client.start();
     return client;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
new file mode 100644
index 0000000..37eadbf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientForATS1_5.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestTimelineClientForATS1_5 {
+
+  protected static Log LOG = LogFactory
+    .getLog(TestTimelineClientForATS1_5.class);
+
+  private TimelineClientImpl client;
+  private static FileContext localFS;
+  private static File localActiveDir;
+  private TimelineWriter spyTimelineWriter;
+
+  @Before
+  public void setup() throws Exception {
+    localFS = FileContext.getLocalFSFileContext();
+    localActiveDir =
+        new File("target", this.getClass().getSimpleName() + "-activeDir")
+          .getAbsoluteFile();
+    localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
+    localActiveDir.mkdir();
+    LOG.info("Created activeDir in " + localActiveDir.getAbsolutePath());
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
+      localActiveDir.getAbsolutePath());
+    conf.set(
+      YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+      "summary_type");
+    client = createTimelineClient(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (client != null) {
+      client.stop();
+    }
+    localFS.delete(new Path(localActiveDir.getAbsolutePath()), true);
+  }
+
+  @Test
+  public void testPostEntities() throws Exception {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    TimelineEntityGroupId groupId =
+        TimelineEntityGroupId.newInstance(appId, "1");
+    TimelineEntityGroupId groupId2 =
+        TimelineEntityGroupId.newInstance(appId, "2");
+    // Create two entities, includes an entity type and a summary type
+    TimelineEntity[] entities = new TimelineEntity[2];
+    entities[0] = generateEntity("entity_type");
+    entities[1] = generateEntity("summary_type");
+    try {
+      // if attemptid is null, fall back to the original putEntities call, and
+      // save the entity
+      // into configured levelDB store
+      client.putEntities(null, null, entities);
+      verify(spyTimelineWriter, times(1)).putEntities(entities);
+      reset(spyTimelineWriter);
+
+      // if the attemptId is specified, but groupId is given as null, it would
+      // fall back to the original putEntities call if we have the entity type.
+      // the entity which is summary type would be written into FS
+      ApplicationAttemptId attemptId1 =
+          ApplicationAttemptId.newInstance(appId, 1);
+      client.putEntities(attemptId1, null, entities);
+      TimelineEntity[] entityTDB = new TimelineEntity[1];
+      entityTDB[0] = entities[0];
+      verify(spyTimelineWriter, times(1)).putEntities(entityTDB);
+      Assert.assertTrue(localFS.util().exists(
+        new Path(getAppAttemptDir(attemptId1), "summarylog-"
+            + attemptId1.toString())));
+      reset(spyTimelineWriter);
+
+      // if we specified attemptId as well as groupId, it would save the entity
+      // into
+      // FileSystem instead of levelDB store
+      ApplicationAttemptId attemptId2 =
+          ApplicationAttemptId.newInstance(appId, 2);
+      client.putEntities(attemptId2, groupId, entities);
+      client.putEntities(attemptId2, groupId2, entities);
+      verify(spyTimelineWriter, times(0)).putEntities(
+        any(TimelineEntity[].class));
+      Assert.assertTrue(localFS.util().exists(
+        new Path(getAppAttemptDir(attemptId2), "summarylog-"
+            + attemptId2.toString())));
+      Assert.assertTrue(localFS.util().exists(
+        new Path(getAppAttemptDir(attemptId2), "entitylog-"
+            + groupId.toString())));
+      Assert.assertTrue(localFS.util().exists(
+        new Path(getAppAttemptDir(attemptId2), "entitylog-"
+            + groupId2.toString())));
+      reset(spyTimelineWriter);
+    } catch (Exception e) {
+      Assert.fail("Exception is not expected. " + e);
+    }
+  }
+
+  @Test
+  public void testPutDomain() {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId attemptId1 =
+        ApplicationAttemptId.newInstance(appId, 1);
+    try {
+      TimelineDomain domain = generateDomain();
+
+      client.putDomain(null, domain);
+      verify(spyTimelineWriter, times(1)).putDomain(domain);
+      reset(spyTimelineWriter);
+
+      client.putDomain(attemptId1, domain);
+      verify(spyTimelineWriter, times(0)).putDomain(domain);
+      Assert.assertTrue(localFS.util().exists(
+        new Path(getAppAttemptDir(attemptId1), "domainlog-"
+            + attemptId1.toString())));
+      reset(spyTimelineWriter);
+    } catch (Exception e) {
+      Assert.fail("Exception is not expected." + e);
+    }
+  }
+
+  private Path getAppAttemptDir(ApplicationAttemptId appAttemptId) {
+    Path appDir =
+        new Path(localActiveDir.getAbsolutePath(), appAttemptId
+          .getApplicationId().toString());
+    Path attemptDir = new Path(appDir, appAttemptId.toString());
+    return attemptDir;
+  }
+
+  private static TimelineEntity generateEntity(String type) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityId("entity id");
+    entity.setEntityType(type);
+    entity.setStartTime(System.currentTimeMillis());
+    return entity;
+  }
+
+  private static TimelineDomain generateDomain() {
+    TimelineDomain domain = new TimelineDomain();
+    domain.setId("namesapce id");
+    domain.setDescription("domain description");
+    domain.setOwner("domain owner");
+    domain.setReaders("domain_reader");
+    domain.setWriters("domain_writer");
+    domain.setCreatedTime(0L);
+    domain.setModifiedTime(1L);
+    return domain;
+  }
+
+  private TimelineClientImpl createTimelineClient(YarnConfiguration conf) {
+    TimelineClientImpl client = new TimelineClientImpl() {
+      @Override
+      protected TimelineWriter createTimelineWriter(Configuration conf,
+          UserGroupInformation authUgi, Client client, URI resURI)
+          throws IOException {
+        TimelineWriter timelineWriter =
+            new FileSystemTimelineWriter(conf, authUgi, client, resURI) {
+              public ClientResponse doPostingObject(Object object, String path) {
+                ClientResponse response = mock(ClientResponse.class);
+                when(response.getClientResponseStatus()).thenReturn(
+                  ClientResponse.Status.OK);
+                return response;
+              }
+            };
+        spyTimelineWriter = spy(timelineWriter);
+        return spyTimelineWriter;
+      }
+    };
+
+    client.init(conf);
+    client.start();
+    return client;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/630b637f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
index 5cb1baa..46d5b6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServicesWithSSL.java
@@ -19,15 +19,20 @@
 package org.apache.hadoop.yarn.server.timeline.webapp;
 
 import java.io.File;
+import java.io.IOException;
+import java.net.URI;
 import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
@@ -39,6 +44,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 
 public class TestTimelineWebServicesWithSSL {
@@ -60,6 +66,7 @@ public class TestTimelineWebServicesWithSSL {
     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
         MemoryTimelineStore.class, TimelineStore.class);
     conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY, "HTTPS_ONLY");
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
 
     File base = new File(BASEDIR);
     FileUtil.fullyDelete(base);
@@ -123,11 +130,17 @@ public class TestTimelineWebServicesWithSSL {
     private ClientResponse resp;
 
     @Override
-    public ClientResponse doPostingObject(Object obj, String path) {
-      resp = super.doPostingObject(obj, path);
-      return resp;
+    protected TimelineWriter createTimelineWriter(Configuration conf,
+        UserGroupInformation authUgi, Client client, URI resURI)
+            throws IOException {
+      return new DirectTimelineWriter(authUgi, client, resURI) {
+        @Override
+        public ClientResponse doPostingObject(Object obj, String path) {
+          resp = super.doPostingObject(obj, path);
+          return resp;
+        }
+      };
     }
-
   }
 
 }