You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/03/14 16:16:15 UTC

hadoop git commit: YARN-4545. Allow YARN distributed shell to use ATS v1.5 APIs. Li Lu via junping_du

Repository: hadoop
Updated Branches:
  refs/heads/trunk 658ee95ff -> f291d82cd


YARN-4545. Allow YARN distributed shell to use ATS v1.5 APIs. Li Lu via junping_du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f291d82c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f291d82c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f291d82c

Branch: refs/heads/trunk
Commit: f291d82cd49c04a81380bc45c97c279d791b571c
Parents: 658ee95
Author: Junping Du <ju...@apache.org>
Authored: Mon Mar 14 08:28:38 2016 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Mon Mar 14 08:28:38 2016 -0700

----------------------------------------------------------------------
 hadoop-project/pom.xml                          |  13 ++
 .../pom.xml                                     |  21 ++++
 .../distributedshell/ApplicationMaster.java     |  68 +++++++----
 .../DistributedShellTimelinePlugin.java         |  79 ++++++++++++
 .../distributedshell/package-info.java          |  19 +++
 .../distributedshell/TestDistributedShell.java  | 120 ++++++++++++++++---
 .../yarn/util/timeline/TimelineUtils.java       |  35 ++++++
 .../hadoop/yarn/server/MiniYARNCluster.java     |  10 +-
 .../yarn/server/timeline/TimelineVersion.java   |  31 +++++
 .../server/timeline/TimelineVersionWatcher.java |  47 ++++++++
 .../pom.xml                                     |  16 +++
 .../server/timeline/PluginStoreTestUtils.java   |  51 +++++++-
 12 files changed, 469 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index f23b46e..3362c11 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -298,6 +298,19 @@
 
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+        <type>test-jar</type>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
         <version>${project.version}</version>
         <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index 09a56ea..c118603 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -121,6 +121,27 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timeline-pluginstorage</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 95dbddc..0f82903 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
@@ -99,6 +100,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -277,6 +279,9 @@ public class ApplicationMaster {
   // Timeline Client
   @VisibleForTesting
   TimelineClient timelineClient;
+  static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
+  static final String APPID_TIMELINE_FILTER_NAME = "appId";
+  static final String USER_TIMELINE_FILTER_NAME = "user";
 
   private final String linux_bash_command = "bash";
   private final String windows_command = "cmd /c";
@@ -904,7 +909,7 @@ public class ApplicationMaster {
         applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
       }
       if(applicationMaster.timelineClient != null) {
-        ApplicationMaster.publishContainerStartEvent(
+        applicationMaster.publishContainerStartEvent(
             applicationMaster.timelineClient, container,
             applicationMaster.domainId, applicationMaster.appSubmitterUgi);
       }
@@ -1120,15 +1125,17 @@ public class ApplicationMaster {
       org.apache.commons.io.IOUtils.closeQuietly(ds);
     }
   }
-  
-  private static void publishContainerStartEvent(
-      final TimelineClient timelineClient, Container container, String domainId,
-      UserGroupInformation ugi) {
+
+  private void publishContainerStartEvent(
+      final TimelineClient timelineClient, final Container container,
+      String domainId, UserGroupInformation ugi) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(container.getId().toString());
     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
+    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
+        .getApplicationAttemptId().getApplicationId().toString());
     TimelineEvent event = new TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setEventType(DSEvent.DS_CONTAINER_START.toString());
@@ -1137,28 +1144,27 @@ public class ApplicationMaster {
     entity.addEvent(event);
 
     try {
-      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
-        @Override
-        public TimelinePutResponse run() throws Exception {
-          return processTimelineResponseErrors(
-              timelineClient.putEntities(entity));
-        }
-      });
-    } catch (Exception e) {
+      processTimelineResponseErrors(
+          putContainerEntity(timelineClient,
+              container.getId().getApplicationAttemptId(),
+              entity));
+    } catch (YarnException | IOException e) {
       LOG.error("Container start event could not be published for "
-          + container.getId().toString(),
-          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+          + container.getId().toString(), e);
     }
   }
 
-  private static void publishContainerEndEvent(
+  private void publishContainerEndEvent(
       final TimelineClient timelineClient, ContainerStatus container,
       String domainId, UserGroupInformation ugi) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(container.getContainerId().toString());
     entity.setEntityType(DSEntity.DS_CONTAINER.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
+    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
+        container.getContainerId().getApplicationAttemptId()
+            .getApplicationId().toString());
     TimelineEvent event = new TimelineEvent();
     event.setTimestamp(System.currentTimeMillis());
     event.setEventType(DSEvent.DS_CONTAINER_END.toString());
@@ -1166,22 +1172,38 @@ public class ApplicationMaster {
     event.addEventInfo("Exit Status", container.getExitStatus());
     entity.addEvent(event);
     try {
-      TimelinePutResponse response = timelineClient.putEntities(entity);
-      processTimelineResponseErrors(response);
+      processTimelineResponseErrors(
+          putContainerEntity(timelineClient,
+              container.getContainerId().getApplicationAttemptId(),
+              entity));
     } catch (YarnException | IOException e) {
       LOG.error("Container end event could not be published for "
           + container.getContainerId().toString(), e);
     }
   }
 
-  private static void publishApplicationAttemptEvent(
+  private TimelinePutResponse putContainerEntity(
+      TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
+      TimelineEntity entity)
+      throws YarnException, IOException {
+    if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
+      TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
+          currAttemptId.getApplicationId(),
+          CONTAINER_ENTITY_GROUP_ID);
+      return timelineClient.putEntities(currAttemptId, groupId, entity);
+    } else {
+      return timelineClient.putEntities(entity);
+    }
+  }
+
+  private void publishApplicationAttemptEvent(
       final TimelineClient timelineClient, String appAttemptId,
       DSEvent appEvent, String domainId, UserGroupInformation ugi) {
     final TimelineEntity entity = new TimelineEntity();
     entity.setEntityId(appAttemptId);
     entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
     entity.setDomainId(domainId);
-    entity.addPrimaryFilter("user", ugi.getShortUserName());
+    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
     TimelineEvent event = new TimelineEvent();
     event.setEventType(appEvent.toString());
     event.setTimestamp(System.currentTimeMillis());
@@ -1197,7 +1219,7 @@ public class ApplicationMaster {
     }
   }
 
-  private static TimelinePutResponse processTimelineResponseErrors(
+  private TimelinePutResponse processTimelineResponseErrors(
       TimelinePutResponse response) {
     List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
     if (errors.size() == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java
new file mode 100644
index 0000000..55fbd60
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellTimelinePlugin.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineEntityGroupPlugin;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * Timeline v1.5 reader plugin for YARN distributed shell. It tranlsates an
+ * incoming getEntity request to a set of related timeline entity groups, via
+ * the information provided in the primary filter or entity id field.
+ */
+public class DistributedShellTimelinePlugin extends TimelineEntityGroupPlugin {
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters) {
+    if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityType)) {
+      if (primaryFilter == null) {
+        return null;
+      }
+      return toEntityGroupId(primaryFilter.getValue().toString());
+    }
+    return null;
+  }
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
+      String entityType) {
+    if (ApplicationMaster.DSEntity.DS_CONTAINER.toString().equals(entityId)) {
+      ContainerId containerId = ConverterUtils.toContainerId(entityId);
+      ApplicationId appId = containerId.getApplicationAttemptId()
+          .getApplicationId();
+      return toEntityGroupId(appId.toString());
+    }
+    return null;
+  }
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
+      SortedSet<String> entityIds, Set<String> eventTypes) {
+    // Right now this method is not used by TimelineEntityGroupPlugin
+    return null;
+  }
+
+  private Set<TimelineEntityGroupId> toEntityGroupId(String strAppId) {
+    ApplicationId appId = ConverterUtils.toApplicationId(strAppId);
+    TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
+        appId, ApplicationMaster.CONTAINER_ENTITY_GROUP_ID);
+    Set<TimelineEntityGroupId> result = new HashSet<>();
+    result.add(groupId);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java
new file mode 100644
index 0000000..299a286
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 3197875..6536050 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -36,12 +36,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -50,29 +57,50 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
+import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 public class TestDistributedShell {
 
   private static final Log LOG =
       LogFactory.getLog(TestDistributedShell.class);
 
-  protected MiniYARNCluster yarnCluster = null;  
+  protected MiniYARNCluster yarnCluster = null;
+  protected MiniDFSCluster hdfsCluster = null;
+  private FileSystem fs = null;
   protected YarnConfiguration conf = null;
   private static final int NUM_NMS = 1;
+  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
 
+  @Rule
+  public TimelineVersionWatcher timelineVersionWatcher
+      = new TimelineVersionWatcher();
+  @Rule
+  public Timeout globalTimeout = new Timeout(90000);
+
   @Before
   public void setup() throws Exception {
-    setupInternal(NUM_NMS);
+    setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion());
   }
 
   protected void setupInternal(int numNodeManager) throws Exception {
+    setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION);
+  }
+
+  private void setupInternal(int numNodeManager, float timelineVersion)
+      throws Exception {
 
     LOG.info("Starting up YARN cluster");
     
@@ -84,6 +112,26 @@ public class TestDistributedShell {
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     conf.set("mapreduce.jobhistory.address",
         "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
+
+    // ATS version specific settings
+    if (timelineVersion == 1.0f) {
+      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+          CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
+    } else if (timelineVersion == 1.5f) {
+      if (hdfsCluster == null) {
+        HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+        hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+            .numDataNodes(1).build();
+      }
+      fs = hdfsCluster.getFileSystem();
+      PluginStoreTestUtils.prepareFileSystemForPluginStore(fs);
+      PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
+      conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
+          DistributedShellTimelinePlugin.class.getName());
+    } else {
+      Assert.fail("Wrong timeline version number: " + timelineVersion);
+    }
     
     if (yarnCluster == null) {
       yarnCluster =
@@ -138,6 +186,13 @@ public class TestDistributedShell {
         yarnCluster = null;
       }
     }
+    if (hdfsCluster != null) {
+      try {
+        hdfsCluster.shutdown();
+      } finally {
+        hdfsCluster = null;
+      }
+    }
     FileContext fsContext = FileContext.getLocalFSFileContext();
     fsContext
         .delete(
@@ -146,16 +201,28 @@ public class TestDistributedShell {
             true);
   }
   
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithDomain() throws Exception {
     testDSShell(true);
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithoutDomain() throws Exception {
     testDSShell(false);
   }
 
+  @Test
+  @TimelineVersion(1.5f)
+  public void testDSShellWithoutDomainV1_5() throws Exception {
+    testDSShell(false);
+  }
+
+  @Test
+  @TimelineVersion(1.5f)
+  public void testDSShellWithDomainV1_5() throws Exception {
+    testDSShell(true);
+  }
+
   public void testDSShell(boolean haveDomain) throws Exception {
     String[] args = {
         "--jar",
@@ -239,6 +306,24 @@ public class TestDistributedShell {
     LOG.info("Client run completed. Result=" + result);
     Assert.assertTrue(result.get());
 
+    if (timelineVersionWatcher.getTimelineVersion() == 1.5f) {
+      long scanInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
+      );
+      Path doneDir = new Path(
+          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
+      );
+      // Wait till the data is moved to done dir, or timeout and fail
+      while (true) {
+        RemoteIterator<FileStatus> iterApps = fs.listStatusIterator(doneDir);
+        if (iterApps.hasNext()) {
+          break;
+        }
+        Thread.sleep(scanInterval * 2);
+      }
+    }
+
     TimelineDomain domain = null;
     if (haveDomain) {
       domain = yarnCluster.getApplicationHistoryServer()
@@ -265,11 +350,18 @@ public class TestDistributedShell {
       Assert.assertEquals("DEFAULT",
           entitiesAttempts.getEntities().get(0).getDomainId());
     }
+    String currAttemptEntityId
+        = entitiesAttempts.getEntities().get(0).getEntityId();
+    ApplicationAttemptId attemptId
+        = ConverterUtils.toApplicationAttemptId(currAttemptEntityId);
+    NameValuePair primaryFilter = new NameValuePair(
+        ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
+        attemptId.getApplicationId().toString());
     TimelineEntities entities = yarnCluster
         .getApplicationHistoryServer()
         .getTimelineStore()
         .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
-            null, null, null, null, null, null, null, null);
+            null, null, null, null, primaryFilter, null, null, null);
     Assert.assertNotNull(entities);
     Assert.assertEquals(2, entities.getEntities().size());
     Assert.assertEquals(entities.getEntities().get(0).getEntityType()
@@ -341,7 +433,7 @@ public class TestDistributedShell {
 
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSRestartWithPreviousRunningContainers() throws Exception {
     String[] args = {
         "--jar",
@@ -376,7 +468,7 @@ public class TestDistributedShell {
    * how many attempt failures for previous 2.5 seconds.
    * The application is expected to be successful.
    */
-  @Test(timeout=90000)
+  @Test
   public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
     String[] args = {
         "--jar",
@@ -414,7 +506,7 @@ public class TestDistributedShell {
    * how many attempt failure for previous 15 seconds.
    * The application is expected to be fail.
    */
-  @Test(timeout=90000)
+  @Test
   public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
     String[] args = {
         "--jar",
@@ -446,7 +538,7 @@ public class TestDistributedShell {
       Assert.assertFalse(result);
     }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithCustomLogPropertyFile() throws Exception {
     final File basedir =
         new File("target", TestDistributedShell.class.getName());
@@ -541,7 +633,7 @@ public class TestDistributedShell {
     verifyContainerLog(2, expectedContent, false, "");
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithMultipleArgs() throws Exception {
     String[] args = {
         "--jar",
@@ -575,7 +667,7 @@ public class TestDistributedShell {
     verifyContainerLog(4, expectedContent, false, "");
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithShellScript() throws Exception {
     final File basedir =
         new File("target", TestDistributedShell.class.getName());
@@ -623,7 +715,7 @@ public class TestDistributedShell {
     verifyContainerLog(1, expectedContent, false, "");
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithInvalidArgs() throws Exception {
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
 
@@ -785,7 +877,7 @@ public class TestDistributedShell {
     }
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testContainerLaunchFailureHandling() throws Exception {
     String[] args = {
       "--jar",
@@ -813,7 +905,7 @@ public class TestDistributedShell {
 
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDebugFlag() throws Exception {
     String[] args = {
         "--jar",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index 4f838e6..a794e97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -86,6 +86,41 @@ public class TimelineUtils {
     }
   }
 
+  /**
+   * Returns whether the timeline service is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service is enabled.
+   */
+  public static boolean timelineServiceEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+  }
+
+  /**
+   * Returns the timeline service version. It does not check whether the
+   * timeline service itself is enabled.
+   *
+   * @param conf the configuration
+   * @return the timeline service version as a float.
+   */
+  public static float getTimelineServiceVersion(Configuration conf) {
+    return conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+  }
+
+  /**
+   * Returns whether the timeline service v.1.5 is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service v.1.5 is enabled. V.1.5 refers to a
+   * version equal to 1.5.
+   */
+  public static boolean timelineServiceV1_5Enabled(Configuration conf) {
+    return timelineServiceEnabled(conf) &&
+        Math.abs(getTimelineServiceVersion(conf) - 1.5) < 0.00001;
+  }
+
   public static TimelineAbout createTimelineAbout(String about) {
     TimelineAbout tsInfo = new TimelineAbout(about);
     tsInfo.setHadoopBuildVersion(VersionInfo.getBuildVersion());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 630b7ef..024adc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
@@ -76,6 +75,7 @@ import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -749,8 +749,12 @@ public class MiniYARNCluster extends CompositeService {
       appHistoryServer = new ApplicationHistoryServer();
       conf.setClass(YarnConfiguration.APPLICATION_HISTORY_STORE,
           MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
-      conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
-          MemoryTimelineStore.class, TimelineStore.class);
+      // Only set memory timeline store if timeline v1.5 is not enabled.
+      // Otherwise, caller has the freedom to choose storage impl.
+      if (!TimelineUtils.timelineServiceV1_5Enabled(conf)) {
+        conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
+            MemoryTimelineStore.class, TimelineStore.class);
+      }
       conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
           MemoryTimelineStateStore.class, TimelineStateStore.class);
       if (!useFixedPorts) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java
new file mode 100644
index 0000000..57439de
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersion.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(value = RetentionPolicy.RUNTIME)
+@Target(value = {ElementType.METHOD})
+public @interface TimelineVersion {
+  float value() default TimelineVersionWatcher.DEFAULT_TIMELINE_VERSION;
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java
new file mode 100644
index 0000000..b00f13a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineVersionWatcher.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineVersionWatcher extends TestWatcher {
+  static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private TimelineVersion version;
+
+  @Override
+  protected void starting(Description description) {
+    version = description.getAnnotation(TimelineVersion.class);
+  }
+
+  /**
+   * @return the version number of timeline server for the current test (using
+   * timeline server v1.0 by default)
+   */
+  public float getTimelineVersion() {
+    if(version == null) {
+      return DEFAULT_TIMELINE_VERSION;
+    }
+    return version.value();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
index 71f76d3..1dd301a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/pom.xml
@@ -137,4 +137,20 @@
       <artifactId>jackson-databind</artifactId>
     </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f291d82c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
index f529b59..c2c4101 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
@@ -18,13 +18,16 @@
 package org.apache.hadoop.yarn.server.timeline;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
 import org.codehaus.jackson.JsonFactory;
@@ -48,7 +51,53 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-class PluginStoreTestUtils {
+/**
+ * Utility methods related to the ATS v1.5 plugin storage tests.
+ */
+public class PluginStoreTestUtils {
+
+  /**
+   * For a given file system, setup directories ready to test the plugin storage.
+   *
+   * @param fs a {@link FileSystem} object that the plugin storage will work with
+   * @return the dfsCluster ready to start plugin storage tests.
+   * @throws IOException
+   */
+  public static FileSystem prepareFileSystemForPluginStore(FileSystem fs)
+      throws IOException {
+    Path activeDir = new Path(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT
+    );
+    Path doneDir = new Path(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
+    );
+
+    fs.mkdirs(activeDir);
+    fs.mkdirs(doneDir);
+    return fs;
+  }
+
+  /**
+   * Prepare configuration for plugin tests. This method will also add the mini
+   * DFS cluster's info to the configuration.
+   * Note: the test program needs to setup the reader plugin by itself.
+   *
+   * @param conf
+   * @param dfsCluster
+   * @return the modified configuration
+   */
+  public static YarnConfiguration prepareConfiguration(YarnConfiguration conf,
+      MiniDFSCluster dfsCluster) {
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+        dfsCluster.getURI().toString());
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
+    conf.setLong(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+        1);
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_STORE,
+        EntityGroupFSTimelineStore.class.getName());
+    return conf;
+  }
 
   static FSDataOutputStream createLogFile(Path logPath, FileSystem fs)
       throws IOException {