You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/27 19:44:47 UTC

hadoop git commit: YARN-3030. Set up TS aggregator with basic request serving structure and lifecycle. Contributed by Sangjin Lee.

Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 3cd9e8dd1 -> f26941b39


YARN-3030. Set up TS aggregator with basic request serving structure and lifecycle. Contributed by Sangjin Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f26941b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f26941b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f26941b3

Branch: refs/heads/YARN-2928
Commit: f26941b39028ac30c77547e2be2d657bb5bf044a
Parents: 3cd9e8d
Author: Zhijie Shen <zj...@apache.org>
Authored: Tue Jan 27 10:43:12 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Tue Jan 27 10:43:12 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop-yarn-server-nodemanager/pom.xml      |   5 +
 .../server/nodemanager/webapp/WebServer.java    |  10 +-
 .../hadoop-yarn-server-timelineservice/pom.xml  |  54 +++-
 .../timelineservice/TimelineAggregator.java     |  23 --
 .../aggregator/AppLevelAggregatorService.java   |  57 +++++
 .../aggregator/AppLevelServiceManager.java      | 136 ++++++++++
 .../AppLevelServiceManagerProvider.java         |  33 +++
 .../aggregator/BaseAggregatorService.java       | 104 ++++++++
 .../aggregator/PerNodeAggregatorServer.java     | 252 +++++++++++++++++++
 .../aggregator/PerNodeAggregatorWebService.java | 168 +++++++++++++
 .../timelineservice/TestTimelineAggregator.java |  23 --
 .../TestAppLevelAggregatorService.java          |  23 ++
 .../aggregator/TestAppLevelServiceManager.java  | 102 ++++++++
 .../aggregator/TestBaseAggregatorService.java   |  23 ++
 .../aggregator/TestPerNodeAggregatorServer.java | 149 +++++++++++
 16 files changed, 1116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4436fd2..ee4e6d9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -8,6 +8,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
 
     YARN-3063. Bootstrapping TimelineServer next generation module. (zjshen)
 
+    YARN-3030. Set up TS aggregator with basic request serving structure and
+    lifecycle. (Sangjin Lee via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index b1efa5f..26a33b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -53,6 +53,11 @@
       <artifactId>hadoop-yarn-api</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index ca2f239..eae8889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -30,6 +29,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
@@ -112,6 +114,12 @@ public class WebServer extends AbstractService {
       bind(NMWebServices.class);
       bind(GenericExceptionHandler.class);
       bind(JAXBContextResolver.class);
+      // host the timeline service aggregator web service temporarily
+      // (see YARN-3087)
+      bind(PerNodeAggregatorWebService.class);
+      // bind to the global singleton instance
+      bind(AppLevelServiceManager.class).
+          toProvider(AppLevelServiceManagerProvider.class);
       bind(ResourceView.class).toInstance(this.resourceView);
       bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
       bind(LocalDirsHandlerService.class).toInstance(dirsHandler);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index b3fbc3e..3154ca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -43,6 +43,11 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-api</artifactId>
     </dependency>
 
@@ -53,12 +58,57 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-common</artifactId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>javax.xml.bind</groupId>
+      <artifactId>jaxb-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
deleted file mode 100644
index 955aad5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice;
-
-public class TimelineAggregator {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
new file mode 100644
index 0000000..bf72fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelAggregatorService extends BaseAggregatorService {
+  private final String applicationId;
+  // TODO define key metadata such as flow metadata, user, and queue
+
+  public AppLevelAggregatorService(String applicationId) {
+    super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
+    this.applicationId = applicationId;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
new file mode 100644
index 0000000..05d321f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * Class that manages adding and removing app level aggregator services and
+ * their lifecycle. It provides thread safety access to the app level services.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class AppLevelServiceManager extends CompositeService {
+  private static final Log LOG =
+      LogFactory.getLog(AppLevelServiceManager.class);
+  private static final AppLevelServiceManager INSTANCE =
+      new AppLevelServiceManager();
+
+  // access to this map is synchronized with the map itself
+  private final Map<String,AppLevelAggregatorService> services =
+      Collections.synchronizedMap(
+          new HashMap<String,AppLevelAggregatorService>());
+
+  static AppLevelServiceManager getInstance() {
+    return INSTANCE;
+  }
+
+  AppLevelServiceManager() {
+    super(AppLevelServiceManager.class.getName());
+  }
+
+  /**
+   * Creates and adds an app level aggregator service for the specified
+   * application id. The service is also initialized and started. If the service
+   * already exists, no new service is created.
+   *
+   * @throws YarnRuntimeException if there was any exception in initializing and
+   * starting the app level service
+   * @return whether it was added successfully
+   */
+  public boolean addService(String appId) {
+    synchronized (services) {
+      AppLevelAggregatorService service = services.get(appId);
+      if (service == null) {
+        try {
+          service = new AppLevelAggregatorService(appId);
+          // initialize, start, and add it to the parent service so it can be
+          // cleaned up when the parent shuts down
+          service.init(getConfig());
+          service.start();
+          services.put(appId, service);
+          LOG.info("the application aggregator service for " + appId +
+              " was added");
+          return true;
+        } catch (Exception e) {
+          throw new YarnRuntimeException(e);
+        }
+      } else {
+        String msg = "the application aggregator service for " + appId +
+            " already exists!";
+        LOG.error(msg);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Removes the app level aggregator service for the specified application id.
+   * The service is also stopped as a result. If the service does not exist, no
+   * change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean removeService(String appId) {
+    synchronized (services) {
+      AppLevelAggregatorService service = services.remove(appId);
+      if (service == null) {
+        String msg = "the application aggregator service for " + appId +
+            " does not exist!";
+        LOG.error(msg);
+        return false;
+      } else {
+        // stop the service to do clean up
+        service.stop();
+        LOG.info("the application aggregator service for " + appId +
+            " was removed");
+        return true;
+      }
+    }
+  }
+
+  /**
+   * Returns the app level aggregator service for the specified application id.
+   *
+   * @return the app level aggregator service or null if it does not exist
+   */
+  public AppLevelAggregatorService getService(String appId) {
+    return services.get(appId);
+  }
+
+  /**
+   * Returns whether the app level aggregator service for the specified
+   * application id exists.
+   */
+  public boolean hasService(String appId) {
+    return services.containsKey(appId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
new file mode 100644
index 0000000..8768575
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import com.google.inject.Provider;
+
+/**
+ * A guice provider that provides a global singleton instance of
+ * AppLevelServiceManager.
+ */
+public class AppLevelServiceManagerProvider
+    implements Provider<AppLevelServiceManager> {
+  @Override
+  public AppLevelServiceManager get() {
+    return AppLevelServiceManager.getInstance();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
new file mode 100644
index 0000000..994c66f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public class BaseAggregatorService extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class);
+
+  public BaseAggregatorService(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  /**
+   * Handles entity writes. These writes are synchronous and are written to the
+   * backing storage without buffering/batching. If any entity already exists,
+   * it results in an update of the entity.
+   *
+   * This method should be reserved for selected critical entities and events.
+   * For normal voluminous writes one should use the async method
+   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   * @return the response that contains the result of the post.
+   */
+  public TimelinePutResponse postEntities(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+    return null;
+  }
+
+  /**
+   * Handles entity writes in an asynchronous manner. The method returns as soon
+   * as validation is done. No promises are made on how quickly it will be
+   * written to the backing storage or if it will always be written to the
+   * backing storage. Multiple writes to the same entities may be batched and
+   * appropriate values updated and result in fewer writes to the backing
+   * storage.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   */
+  public void postEntitiesAsync(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
new file mode 100644
index 0000000..6371e82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline aggregator service. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeAggregatorServer extends AuxiliaryService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeAggregatorServer.class);
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private final AppLevelServiceManager serviceManager;
+  private WebApp webApp;
+
+  public PerNodeAggregatorServer() {
+    // use the same singleton
+    this(AppLevelServiceManager.getInstance());
+  }
+
+  @VisibleForTesting
+  PerNodeAggregatorServer(AppLevelServiceManager serviceManager) {
+    super("timeline_aggregator");
+    this.serviceManager = serviceManager;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    serviceManager.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    serviceManager.start();
+    startWebApp();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (webApp != null) {
+      webApp.stop();
+    }
+    // stop the service manager
+    serviceManager.stop();
+    super.serviceStop();
+  }
+
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    // use the same ports as the old ATS for now; we could create new properties
+    // for the new timeline service if needed
+    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+                          YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+                          WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+    LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+    try {
+      webApp =
+          WebApps
+            .$for("timeline", null, null, "ws")
+            .with(conf).at(bindAddress).start(
+                new TimelineServiceWebApp());
+    } catch (Exception e) {
+      String msg = "The per-node aggregator webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+
+  private static class TimelineServiceWebApp
+      extends WebApp implements YarnWebParams {
+    @Override
+    public void setup() {
+      bind(PerNodeAggregatorWebService.class);
+      // bind to the global singleton
+      bind(AppLevelServiceManager.class).
+          toProvider(AppLevelServiceManagerProvider.class);
+    }
+  }
+
+  // these methods can be used as the basis for future service methods if the
+  // per-node aggregator runs separate from the node manager
+  /**
+   * Creates and adds an app level aggregator service for the specified
+   * application id. The service is also initialized and started. If the service
+   * already exists, no new service is created.
+   *
+   * @return whether it was added successfully
+   */
+  public boolean addApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    return serviceManager.addService(appIdString);
+  }
+
+  /**
+   * Removes the app level aggregator service for the specified application id.
+   * The service is also stopped as a result. If the service does not exist, no
+   * change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean removeApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    return serviceManager.removeService(appIdString);
+  }
+
+  /**
+   * Creates and adds an app level aggregator service for the specified
+   * application id. The service is also initialized and started. If the service
+   * already exists, no new service is created.
+   */
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    // intercept the event of the AM container being created and initialize the
+    // app level aggregator service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      addApplication(appId);
+    }
+  }
+
+  /**
+   * Removes the app level aggregator service for the specified application id.
+   * The service is also stopped as a result. If the service does not exist, no
+   * change is made.
+   */
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    // intercept the event of the AM container being stopped and remove the app
+    // level aggregator service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      removeApplication(appId);
+    }
+  }
+
+  private boolean isApplicationMaster(ContainerContext context) {
+    // TODO this is based on a (shaky) assumption that the container id (the
+    // last field of the full container id) for an AM is always 1
+    // we want to make this much more reliable
+    ContainerId containerId = context.getContainerId();
+    return containerId.getContainerId() == 1L;
+  }
+
+  @VisibleForTesting
+  boolean hasApplication(String appId) {
+    return serviceManager.hasService(appId);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    // TODO currently it is not used; we can return a more meaningful data when
+    // we connect it with an AM
+    return ByteBuffer.allocate(0);
+  }
+
+  @VisibleForTesting
+  static PerNodeAggregatorServer launchServer(String[] args) {
+    Thread
+      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
+        LOG);
+    PerNodeAggregatorServer server = null;
+    try {
+      server = new PerNodeAggregatorServer();
+      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server),
+          SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration();
+      server.init(conf);
+      server.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting PerNodeAggregatorServer", t);
+      ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
+    }
+    return server;
+  }
+
+  private static class ShutdownHook implements Runnable {
+    private final PerNodeAggregatorServer server;
+
+    public ShutdownHook(PerNodeAggregatorServer server) {
+      this.server = server;
+    }
+
+    public void run() {
+      server.stop();
+    }
+  }
+
+  public static void main(String[] args) {
+    launchServer(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
new file mode 100644
index 0000000..2d96699
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class PerNodeAggregatorWebService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeAggregatorWebService.class);
+
+  private final AppLevelServiceManager serviceManager;
+
+  @Inject
+  public PerNodeAggregatorWebService(AppLevelServiceManager serviceManager) {
+    this.serviceManager = serviceManager;
+  }
+
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String about) {
+      this.about = about;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String about) {
+      this.about = about;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline API");
+  }
+
+  /**
+   * Accepts writes to the aggregator, and returns a response. It simply routes
+   * the request to the app level aggregator. It expects an application as a
+   * context.
+   */
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      TimelineEntities entities) {
+    init(res);
+    UserGroupInformation callerUgi = getUser(req);
+    if (callerUgi == null) {
+      String msg = "The owner of the posted timeline entities is not set";
+      LOG.error(msg);
+      throw new ForbiddenException(msg);
+    }
+
+    // TODO how to express async posts and handle them
+    try {
+      AppLevelAggregatorService service = getAggregatorService(req);
+      if (service == null) {
+        LOG.error("Application not found");
+        throw new NotFoundException(); // different exception?
+      }
+      return service.postEntities(entities, callerUgi);
+    } catch (Exception e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private AppLevelAggregatorService
+      getAggregatorService(HttpServletRequest req) {
+    String appIdString = getApplicationId(req);
+    return serviceManager.getService(appIdString);
+  }
+
+  private String getApplicationId(HttpServletRequest req) {
+    // TODO the application id from the request
+    // (most likely from the URI)
+    return null;
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUgi = null;
+    if (remoteUser != null) {
+      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUgi;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
deleted file mode 100644
index 7e0b775..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice;
-
-public class TestTimelineAggregator {
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
new file mode 100644
index 0000000..c0af8c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+
+public class TestAppLevelAggregatorService {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
new file mode 100644
index 0000000..3f981c7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestAppLevelServiceManager {
+
+  @Test(timeout=60000)
+  public void testMultithreadedAdd() throws Exception {
+    final AppLevelServiceManager serviceManager =
+        spy(new AppLevelServiceManager());
+    doReturn(new Configuration()).when(serviceManager).getConfig();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final String appId = String.valueOf(i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          return serviceManager.addService(appId);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      assertTrue(serviceManager.hasService(String.valueOf(i)));
+    }
+  }
+
+  @Test
+  public void testMultithreadedAddAndRemove() throws Exception {
+    final AppLevelServiceManager serviceManager =
+        spy(new AppLevelServiceManager());
+    doReturn(new Configuration()).when(serviceManager).getConfig();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final String appId = String.valueOf(i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          return serviceManager.addService(appId) &&
+              serviceManager.removeService(appId);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      assertFalse(serviceManager.hasService(String.valueOf(i)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
new file mode 100644
index 0000000..55953cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+public class TestBaseAggregatorService {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
new file mode 100644
index 0000000..902047d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeAggregatorServer {
+  private ApplicationAttemptId appAttemptId;
+
+  public TestPerNodeAggregatorServer() {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+  }
+
+  @Test
+  public void testAddApplication() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+    // aggregator should have a single app
+    assertTrue(aggregator.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+    aggregator.close();
+  }
+
+  @Test
+  public void testAddApplicationNonAMContainer() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregator();
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.initializeContainer(context);
+    // aggregator should not have that app
+    assertFalse(aggregator.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+  }
+
+  @Test
+  public void testRemoveApplication() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+    // aggregator should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(aggregator.hasApplication(appIdStr));
+
+    ContainerId containerId = getAMContainerId();
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.stopContainer(context);
+    // aggregator should not have that app
+    assertFalse(aggregator.hasApplication(appIdStr));
+    aggregator.close();
+  }
+
+  @Test
+  public void testRemoveApplicationNonAMContainer() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+    // aggregator should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(aggregator.hasApplication(appIdStr));
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.stopContainer(context);
+    // aggregator should still have that app
+    assertTrue(aggregator.hasApplication(appIdStr));
+    aggregator.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testLaunch() throws Exception {
+    ExitUtil.disableSystemExit();
+    PerNodeAggregatorServer server = null;
+    try {
+      server =
+          PerNodeAggregatorServer.launchServer(new String[0]);
+    } catch (ExitUtil.ExitException e) {
+      assertEquals(0, e.status);
+      ExitUtil.resetFirstExitException();
+      fail();
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+    }
+  }
+
+  private PerNodeAggregatorServer createAggregatorAndAddApplication() {
+    PerNodeAggregatorServer aggregator = createAggregator();
+    // create an AM container
+    ContainerId containerId = getAMContainerId();
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.initializeContainer(context);
+    return aggregator;
+  }
+
+  private PerNodeAggregatorServer createAggregator() {
+    AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager());
+    doReturn(new Configuration()).when(serviceManager).getConfig();
+    PerNodeAggregatorServer aggregator =
+        spy(new PerNodeAggregatorServer(serviceManager));
+    return aggregator;
+  }
+
+  private ContainerId getAMContainerId() {
+    return getContainerId(1L);
+  }
+
+  private ContainerId getContainerId(long id) {
+    return ContainerId.newContainerId(appAttemptId, id);
+  }
+}