You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/27 19:44:47 UTC
hadoop git commit: YARN-3030. Set up TS aggregator with basic request
serving structure and lifecycle. Contributed by Sangjin Lee.
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 3cd9e8dd1 -> f26941b39
YARN-3030. Set up TS aggregator with basic request serving structure and lifecycle. Contributed by Sangjin Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f26941b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f26941b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f26941b3
Branch: refs/heads/YARN-2928
Commit: f26941b39028ac30c77547e2be2d657bb5bf044a
Parents: 3cd9e8d
Author: Zhijie Shen <zj...@apache.org>
Authored: Tue Jan 27 10:43:12 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Tue Jan 27 10:43:12 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop-yarn-server-nodemanager/pom.xml | 5 +
.../server/nodemanager/webapp/WebServer.java | 10 +-
.../hadoop-yarn-server-timelineservice/pom.xml | 54 +++-
.../timelineservice/TimelineAggregator.java | 23 --
.../aggregator/AppLevelAggregatorService.java | 57 +++++
.../aggregator/AppLevelServiceManager.java | 136 ++++++++++
.../AppLevelServiceManagerProvider.java | 33 +++
.../aggregator/BaseAggregatorService.java | 104 ++++++++
.../aggregator/PerNodeAggregatorServer.java | 252 +++++++++++++++++++
.../aggregator/PerNodeAggregatorWebService.java | 168 +++++++++++++
.../timelineservice/TestTimelineAggregator.java | 23 --
.../TestAppLevelAggregatorService.java | 23 ++
.../aggregator/TestAppLevelServiceManager.java | 102 ++++++++
.../aggregator/TestBaseAggregatorService.java | 23 ++
.../aggregator/TestPerNodeAggregatorServer.java | 149 +++++++++++
16 files changed, 1116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4436fd2..ee4e6d9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -8,6 +8,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3063. Bootstrapping TimelineServer next generation module. (zjshen)
+ YARN-3030. Set up TS aggregator with basic request serving structure and
+ lifecycle. (Sangjin Lee via zjshen)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index b1efa5f..26a33b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -53,6 +53,11 @@
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index ca2f239..eae8889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -30,6 +29,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
@@ -112,6 +114,12 @@ public class WebServer extends AbstractService {
bind(NMWebServices.class);
bind(GenericExceptionHandler.class);
bind(JAXBContextResolver.class);
+ // host the timeline service aggregator web service temporarily
+ // (see YARN-3087)
+ bind(PerNodeAggregatorWebService.class);
+ // bind to the global singleton instance
+ bind(AppLevelServiceManager.class).
+ toProvider(AppLevelServiceManagerProvider.class);
bind(ResourceView.class).toInstance(this.resourceView);
bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
bind(LocalDirsHandlerService.class).toInstance(dirsHandler);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index b3fbc3e..3154ca3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -43,6 +43,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
@@ -53,12 +58,57 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-common</artifactId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+
+ <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+ <dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
deleted file mode 100644
index 955aad5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice;
-
-public class TimelineAggregator {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
new file mode 100644
index 0000000..bf72fb9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelAggregatorService extends BaseAggregatorService {
+ private final String applicationId;
+ // TODO define key metadata such as flow metadata, user, and queue
+
+ public AppLevelAggregatorService(String applicationId) {
+ super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
new file mode 100644
index 0000000..05d321f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * Class that manages adding and removing app level aggregator services and
+ * their lifecycle. It provides thread safety access to the app level services.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class AppLevelServiceManager extends CompositeService {
+ private static final Log LOG =
+ LogFactory.getLog(AppLevelServiceManager.class);
+ private static final AppLevelServiceManager INSTANCE =
+ new AppLevelServiceManager();
+
+ // access to this map is synchronized with the map itself
+ private final Map<String,AppLevelAggregatorService> services =
+ Collections.synchronizedMap(
+ new HashMap<String,AppLevelAggregatorService>());
+
+ static AppLevelServiceManager getInstance() {
+ return INSTANCE;
+ }
+
+ AppLevelServiceManager() {
+ super(AppLevelServiceManager.class.getName());
+ }
+
+ /**
+ * Creates and adds an app level aggregator service for the specified
+ * application id. The service is also initialized and started. If the service
+ * already exists, no new service is created.
+ *
+ * @throws YarnRuntimeException if there was any exception in initializing and
+ * starting the app level service
+ * @return whether it was added successfully
+ */
+ public boolean addService(String appId) {
+ synchronized (services) {
+ AppLevelAggregatorService service = services.get(appId);
+ if (service == null) {
+ try {
+ service = new AppLevelAggregatorService(appId);
+ // initialize, start, and add it to the parent service so it can be
+ // cleaned up when the parent shuts down
+ service.init(getConfig());
+ service.start();
+ services.put(appId, service);
+ LOG.info("the application aggregator service for " + appId +
+ " was added");
+ return true;
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ String msg = "the application aggregator service for " + appId +
+ " already exists!";
+ LOG.error(msg);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Removes the app level aggregator service for the specified application id.
+ * The service is also stopped as a result. If the service does not exist, no
+ * change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean removeService(String appId) {
+ synchronized (services) {
+ AppLevelAggregatorService service = services.remove(appId);
+ if (service == null) {
+ String msg = "the application aggregator service for " + appId +
+ " does not exist!";
+ LOG.error(msg);
+ return false;
+ } else {
+ // stop the service to do clean up
+ service.stop();
+ LOG.info("the application aggregator service for " + appId +
+ " was removed");
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Returns the app level aggregator service for the specified application id.
+ *
+ * @return the app level aggregator service or null if it does not exist
+ */
+ public AppLevelAggregatorService getService(String appId) {
+ return services.get(appId);
+ }
+
+ /**
+ * Returns whether the app level aggregator service for the specified
+ * application id exists.
+ */
+ public boolean hasService(String appId) {
+ return services.containsKey(appId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
new file mode 100644
index 0000000..8768575
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import com.google.inject.Provider;
+
+/**
+ * A guice provider that provides a global singleton instance of
+ * AppLevelServiceManager.
+ */
+public class AppLevelServiceManagerProvider
+ implements Provider<AppLevelServiceManager> {
+ @Override
+ public AppLevelServiceManager get() {
+ return AppLevelServiceManager.getInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
new file mode 100644
index 0000000..994c66f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public class BaseAggregatorService extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class);
+
+ public BaseAggregatorService(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ /**
+ * Handles entity writes. These writes are synchronous and are written to the
+ * backing storage without buffering/batching. If any entity already exists,
+ * it results in an update of the entity.
+ *
+ * This method should be reserved for selected critical entities and events.
+ * For normal voluminous writes one should use the async method
+ * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ * @return the response that contains the result of the post.
+ */
+ public TimelinePutResponse postEntities(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ return null;
+ }
+
+ /**
+ * Handles entity writes in an asynchronous manner. The method returns as soon
+ * as validation is done. No promises are made on how quickly it will be
+ * written to the backing storage or if it will always be written to the
+ * backing storage. Multiple writes to the same entities may be batched and
+ * appropriate values updated and result in fewer writes to the backing
+ * storage.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ */
+ public void postEntitiesAsync(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
new file mode 100644
index 0000000..6371e82
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline aggregator service. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeAggregatorServer extends AuxiliaryService {
+ private static final Log LOG =
+ LogFactory.getLog(PerNodeAggregatorServer.class);
+ private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ private final AppLevelServiceManager serviceManager;
+ private WebApp webApp;
+
+ public PerNodeAggregatorServer() {
+ // use the same singleton
+ this(AppLevelServiceManager.getInstance());
+ }
+
+ @VisibleForTesting
+ PerNodeAggregatorServer(AppLevelServiceManager serviceManager) {
+ super("timeline_aggregator");
+ this.serviceManager = serviceManager;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ serviceManager.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ serviceManager.start();
+ startWebApp();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (webApp != null) {
+ webApp.stop();
+ }
+ // stop the service manager
+ serviceManager.stop();
+ super.serviceStop();
+ }
+
+ private void startWebApp() {
+ Configuration conf = getConfig();
+ // use the same ports as the old ATS for now; we could create new properties
+ // for the new timeline service if needed
+ String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+ YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+ LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+ try {
+ webApp =
+ WebApps
+ .$for("timeline", null, null, "ws")
+ .with(conf).at(bindAddress).start(
+ new TimelineServiceWebApp());
+ } catch (Exception e) {
+ String msg = "The per-node aggregator webapp failed to start.";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+
+ private static class TimelineServiceWebApp
+ extends WebApp implements YarnWebParams {
+ @Override
+ public void setup() {
+ bind(PerNodeAggregatorWebService.class);
+ // bind to the global singleton
+ bind(AppLevelServiceManager.class).
+ toProvider(AppLevelServiceManagerProvider.class);
+ }
+ }
+
+ // these methods can be used as the basis for future service methods if the
+ // per-node aggregator runs separate from the node manager
+ /**
+ * Creates and adds an app level aggregator service for the specified
+ * application id. The service is also initialized and started. If the service
+ * already exists, no new service is created.
+ *
+ * @return whether it was added successfully
+ */
+ public boolean addApplication(ApplicationId appId) {
+ String appIdString = appId.toString();
+ return serviceManager.addService(appIdString);
+ }
+
+ /**
+ * Removes the app level aggregator service for the specified application id.
+ * The service is also stopped as a result. If the service does not exist, no
+ * change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean removeApplication(ApplicationId appId) {
+ String appIdString = appId.toString();
+ return serviceManager.removeService(appIdString);
+ }
+
+ /**
+ * Creates and adds an app level aggregator service for the specified
+ * application id. The service is also initialized and started. If the service
+ * already exists, no new service is created.
+ */
+ @Override
+ public void initializeContainer(ContainerInitializationContext context) {
+ // intercept the event of the AM container being created and initialize the
+ // app level aggregator service
+ if (isApplicationMaster(context)) {
+ ApplicationId appId = context.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ addApplication(appId);
+ }
+ }
+
+ /**
+ * Removes the app level aggregator service for the specified application id.
+ * The service is also stopped as a result. If the service does not exist, no
+ * change is made.
+ */
+ @Override
+ public void stopContainer(ContainerTerminationContext context) {
+ // intercept the event of the AM container being stopped and remove the app
+ // level aggregator service
+ if (isApplicationMaster(context)) {
+ ApplicationId appId = context.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ removeApplication(appId);
+ }
+ }
+
+ private boolean isApplicationMaster(ContainerContext context) {
+ // TODO this is based on a (shaky) assumption that the container id (the
+ // last field of the full container id) for an AM is always 1
+ // we want to make this much more reliable
+ ContainerId containerId = context.getContainerId();
+ return containerId.getContainerId() == 1L;
+ }
+
+ @VisibleForTesting
+ boolean hasApplication(String appId) {
+ return serviceManager.hasService(appId);
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext context) {
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext context) {
+ }
+
+ @Override
+ public ByteBuffer getMetaData() {
+ // TODO currently it is not used; we can return a more meaningful data when
+ // we connect it with an AM
+ return ByteBuffer.allocate(0);
+ }
+
+ @VisibleForTesting
+ static PerNodeAggregatorServer launchServer(String[] args) {
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
+ LOG);
+ PerNodeAggregatorServer server = null;
+ try {
+ server = new PerNodeAggregatorServer();
+ ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server),
+ SHUTDOWN_HOOK_PRIORITY);
+ YarnConfiguration conf = new YarnConfiguration();
+ server.init(conf);
+ server.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting PerNodeAggregatorServer", t);
+ ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
+ }
+ return server;
+ }
+
+ private static class ShutdownHook implements Runnable {
+ private final PerNodeAggregatorServer server;
+
+ public ShutdownHook(PerNodeAggregatorServer server) {
+ this.server = server;
+ }
+
+ public void run() {
+ server.stop();
+ }
+ }
+
+ public static void main(String[] args) {
+ launchServer(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
new file mode 100644
index 0000000..2d96699
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class PerNodeAggregatorWebService {
+ private static final Log LOG =
+ LogFactory.getLog(PerNodeAggregatorWebService.class);
+
+ private final AppLevelServiceManager serviceManager;
+
+ @Inject
+ public PerNodeAggregatorWebService(AppLevelServiceManager serviceManager) {
+ this.serviceManager = serviceManager;
+ }
+
+ @XmlRootElement(name = "about")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class AboutInfo {
+
+ private String about;
+
+ public AboutInfo() {
+
+ }
+
+ public AboutInfo(String about) {
+ this.about = about;
+ }
+
+ @XmlElement(name = "About")
+ public String getAbout() {
+ return about;
+ }
+
+ public void setAbout(String about) {
+ this.about = about;
+ }
+
+ }
+
+ /**
+ * Return the description of the timeline web services.
+ */
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public AboutInfo about(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res) {
+ init(res);
+ return new AboutInfo("Timeline API");
+ }
+
+ /**
+ * Accepts writes to the aggregator, and returns a response. It simply routes
+ * the request to the app level aggregator. It expects an application as a
+ * context.
+ */
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ TimelineEntities entities) {
+ init(res);
+ UserGroupInformation callerUgi = getUser(req);
+ if (callerUgi == null) {
+ String msg = "The owner of the posted timeline entities is not set";
+ LOG.error(msg);
+ throw new ForbiddenException(msg);
+ }
+
+ // TODO how to express async posts and handle them
+ try {
+ AppLevelAggregatorService service = getAggregatorService(req);
+ if (service == null) {
+ LOG.error("Application not found");
+ throw new NotFoundException(); // different exception?
+ }
+ return service.postEntities(entities, callerUgi);
+ } catch (Exception e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private AppLevelAggregatorService
+ getAggregatorService(HttpServletRequest req) {
+ String appIdString = getApplicationId(req);
+ return serviceManager.getService(appIdString);
+ }
+
+ private String getApplicationId(HttpServletRequest req) {
+ // TODO the application id from the request
+ // (most likely from the URI)
+ return null;
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUgi = null;
+ if (remoteUser != null) {
+ callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUgi;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
deleted file mode 100644
index 7e0b775..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice;
-
-public class TestTimelineAggregator {
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
new file mode 100644
index 0000000..c0af8c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+
+public class TestAppLevelAggregatorService {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
new file mode 100644
index 0000000..3f981c7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestAppLevelServiceManager {
+
+ @Test(timeout=60000)
+ public void testMultithreadedAdd() throws Exception {
+ final AppLevelServiceManager serviceManager =
+ spy(new AppLevelServiceManager());
+ doReturn(new Configuration()).when(serviceManager).getConfig();
+
+ final int NUM_APPS = 5;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ final String appId = String.valueOf(i);
+ Callable<Boolean> task = new Callable<Boolean>() {
+ public Boolean call() {
+ return serviceManager.addService(appId);
+ }
+ };
+ tasks.add(task);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+ try {
+ List<Future<Boolean>> futures = executor.invokeAll(tasks);
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ // check the keys
+ for (int i = 0; i < NUM_APPS; i++) {
+ assertTrue(serviceManager.hasService(String.valueOf(i)));
+ }
+ }
+
+ @Test
+ public void testMultithreadedAddAndRemove() throws Exception {
+ final AppLevelServiceManager serviceManager =
+ spy(new AppLevelServiceManager());
+ doReturn(new Configuration()).when(serviceManager).getConfig();
+
+ final int NUM_APPS = 5;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ final String appId = String.valueOf(i);
+ Callable<Boolean> task = new Callable<Boolean>() {
+ public Boolean call() {
+ return serviceManager.addService(appId) &&
+ serviceManager.removeService(appId);
+ }
+ };
+ tasks.add(task);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+ try {
+ List<Future<Boolean>> futures = executor.invokeAll(tasks);
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ // check the keys
+ for (int i = 0; i < NUM_APPS; i++) {
+ assertFalse(serviceManager.hasService(String.valueOf(i)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
new file mode 100644
index 0000000..55953cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+public class TestBaseAggregatorService {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f26941b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
new file mode 100644
index 0000000..902047d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeAggregatorServer {
+ private ApplicationAttemptId appAttemptId;
+
+ public TestPerNodeAggregatorServer() {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ }
+
+ @Test
+ public void testAddApplication() throws Exception {
+ PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+ // aggregator should have a single app
+ assertTrue(aggregator.hasApplication(
+ appAttemptId.getApplicationId().toString()));
+ aggregator.close();
+ }
+
+ @Test
+ public void testAddApplicationNonAMContainer() throws Exception {
+ PerNodeAggregatorServer aggregator = createAggregator();
+
+ ContainerId containerId = getContainerId(2L); // not an AM
+ ContainerInitializationContext context =
+ mock(ContainerInitializationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ aggregator.initializeContainer(context);
+ // aggregator should not have that app
+ assertFalse(aggregator.hasApplication(
+ appAttemptId.getApplicationId().toString()));
+ }
+
+ @Test
+ public void testRemoveApplication() throws Exception {
+ PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+ // aggregator should have a single app
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ assertTrue(aggregator.hasApplication(appIdStr));
+
+ ContainerId containerId = getAMContainerId();
+ ContainerTerminationContext context =
+ mock(ContainerTerminationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ aggregator.stopContainer(context);
+ // aggregator should not have that app
+ assertFalse(aggregator.hasApplication(appIdStr));
+ aggregator.close();
+ }
+
+ @Test
+ public void testRemoveApplicationNonAMContainer() throws Exception {
+ PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+ // aggregator should have a single app
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ assertTrue(aggregator.hasApplication(appIdStr));
+
+ ContainerId containerId = getContainerId(2L); // not an AM
+ ContainerTerminationContext context =
+ mock(ContainerTerminationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ aggregator.stopContainer(context);
+ // aggregator should still have that app
+ assertTrue(aggregator.hasApplication(appIdStr));
+ aggregator.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLaunch() throws Exception {
+ ExitUtil.disableSystemExit();
+ PerNodeAggregatorServer server = null;
+ try {
+ server =
+ PerNodeAggregatorServer.launchServer(new String[0]);
+ } catch (ExitUtil.ExitException e) {
+ assertEquals(0, e.status);
+ ExitUtil.resetFirstExitException();
+ fail();
+ } finally {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+
+ private PerNodeAggregatorServer createAggregatorAndAddApplication() {
+ PerNodeAggregatorServer aggregator = createAggregator();
+ // create an AM container
+ ContainerId containerId = getAMContainerId();
+ ContainerInitializationContext context =
+ mock(ContainerInitializationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ aggregator.initializeContainer(context);
+ return aggregator;
+ }
+
+ private PerNodeAggregatorServer createAggregator() {
+ AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager());
+ doReturn(new Configuration()).when(serviceManager).getConfig();
+ PerNodeAggregatorServer aggregator =
+ spy(new PerNodeAggregatorServer(serviceManager));
+ return aggregator;
+ }
+
+ private ContainerId getAMContainerId() {
+ return getContainerId(1L);
+ }
+
+ private ContainerId getContainerId(long id) {
+ return ContainerId.newContainerId(appAttemptId, id);
+ }
+}