You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/03 20:32:21 UTC
[43/43] hadoop git commit: YARN-3210. Refactored timeline aggregator
according to new code organization proposed in YARN-3166. Contributed by Li
Lu.
YARN-3210. Refactored timeline aggregator according to new code organization proposed in YARN-3166. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3ff7f06
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3ff7f06
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3ff7f06
Branch: refs/heads/YARN-2928
Commit: d3ff7f06cbc66d3a23c2551e7d4c752689f46afe
Parents: e4d81eb
Author: Zhijie Shen <zj...@apache.org>
Authored: Tue Mar 3 11:21:03 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Tue Mar 3 11:25:17 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../distributedshell/TestDistributedShell.java | 4 +-
.../hadoop-yarn-server-nodemanager/pom.xml | 5 -
.../server/nodemanager/webapp/WebServer.java | 3 -
.../TestTimelineServiceClientIntegration.java | 12 +-
.../aggregator/AppLevelAggregatorService.java | 57 ----
.../aggregator/AppLevelServiceManager.java | 136 ----------
.../AppLevelServiceManagerProvider.java | 33 ---
.../aggregator/AppLevelTimelineAggregator.java | 57 ++++
.../aggregator/BaseAggregatorService.java | 107 --------
.../aggregator/PerNodeAggregatorServer.java | 268 -------------------
.../aggregator/PerNodeAggregatorWebService.java | 180 -------------
.../PerNodeTimelineAggregatorsAuxService.java | 212 +++++++++++++++
.../aggregator/TimelineAggregator.java | 107 ++++++++
.../TimelineAggregatorWebService.java | 180 +++++++++++++
.../TimelineAggregatorsCollection.java | 203 ++++++++++++++
.../TestAppLevelAggregatorService.java | 23 --
.../aggregator/TestAppLevelServiceManager.java | 102 -------
.../TestAppLevelTimelineAggregator.java | 23 ++
.../aggregator/TestBaseAggregatorService.java | 23 --
.../aggregator/TestPerNodeAggregatorServer.java | 149 -----------
...estPerNodeTimelineAggregatorsAuxService.java | 150 +++++++++++
.../aggregator/TestTimelineAggregator.java | 23 ++
.../TestTimelineAggregatorsCollection.java | 108 ++++++++
24 files changed, 1074 insertions(+), 1094 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b13475a..0548460 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -23,6 +23,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3125. Made the distributed shell use timeline service next gen and
add an integration test for it. (Junping Du and Li Lu via zjshen)
+ YARN-3210. Refactored timeline aggregator according to new code
+ organization proposed in YARN-3166. (Li Lu via zjshen)
+
IMPROVEMENTS
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 71466cb..313dc97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -96,7 +96,7 @@ public class TestDistributedShell {
// enable aux-service based timeline aggregators
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
- + ".class", PerNodeAggregatorServer.class.getName());
+ + ".class", PerNodeTimelineAggregatorsAuxService.class.getName());
}
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index 26a33b4..b1efa5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -53,11 +53,6 @@
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-timelineservice</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index 77deaed..fdff480 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -29,9 +29,6 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index a5159a2..32ee5d8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -6,7 +6,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -14,13 +14,13 @@ import org.junit.Test;
import static org.junit.Assert.fail;
public class TestTimelineServiceClientIntegration {
- private static PerNodeAggregatorServer server;
+ private static PerNodeTimelineAggregatorsAuxService auxService;
@BeforeClass
public static void setupClass() throws Exception {
try {
- server = PerNodeAggregatorServer.launchServer(new String[0]);
- server.addApplication(ApplicationId.newInstance(0, 1));
+ auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+ auxService.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) {
fail();
}
@@ -28,8 +28,8 @@ public class TestTimelineServiceClientIntegration {
@AfterClass
public static void tearDownClass() throws Exception {
- if (server != null) {
- server.stop();
+ if (auxService != null) {
+ auxService.stop();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
deleted file mode 100644
index bf72fb9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage for a given YARN application.
- *
- * App-related lifecycle management is handled by this service.
- */
-@Private
-@Unstable
-public class AppLevelAggregatorService extends BaseAggregatorService {
- private final String applicationId;
- // TODO define key metadata such as flow metadata, user, and queue
-
- public AppLevelAggregatorService(String applicationId) {
- super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
- this.applicationId = applicationId;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
deleted file mode 100644
index 05d321f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-
-/**
- * Class that manages adding and removing app level aggregator services and
- * their lifecycle. It provides thread safety access to the app level services.
- *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
- */
-@Private
-@Unstable
-public class AppLevelServiceManager extends CompositeService {
- private static final Log LOG =
- LogFactory.getLog(AppLevelServiceManager.class);
- private static final AppLevelServiceManager INSTANCE =
- new AppLevelServiceManager();
-
- // access to this map is synchronized with the map itself
- private final Map<String,AppLevelAggregatorService> services =
- Collections.synchronizedMap(
- new HashMap<String,AppLevelAggregatorService>());
-
- static AppLevelServiceManager getInstance() {
- return INSTANCE;
- }
-
- AppLevelServiceManager() {
- super(AppLevelServiceManager.class.getName());
- }
-
- /**
- * Creates and adds an app level aggregator service for the specified
- * application id. The service is also initialized and started. If the service
- * already exists, no new service is created.
- *
- * @throws YarnRuntimeException if there was any exception in initializing and
- * starting the app level service
- * @return whether it was added successfully
- */
- public boolean addService(String appId) {
- synchronized (services) {
- AppLevelAggregatorService service = services.get(appId);
- if (service == null) {
- try {
- service = new AppLevelAggregatorService(appId);
- // initialize, start, and add it to the parent service so it can be
- // cleaned up when the parent shuts down
- service.init(getConfig());
- service.start();
- services.put(appId, service);
- LOG.info("the application aggregator service for " + appId +
- " was added");
- return true;
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
- }
- } else {
- String msg = "the application aggregator service for " + appId +
- " already exists!";
- LOG.error(msg);
- return false;
- }
- }
- }
-
- /**
- * Removes the app level aggregator service for the specified application id.
- * The service is also stopped as a result. If the service does not exist, no
- * change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean removeService(String appId) {
- synchronized (services) {
- AppLevelAggregatorService service = services.remove(appId);
- if (service == null) {
- String msg = "the application aggregator service for " + appId +
- " does not exist!";
- LOG.error(msg);
- return false;
- } else {
- // stop the service to do clean up
- service.stop();
- LOG.info("the application aggregator service for " + appId +
- " was removed");
- return true;
- }
- }
- }
-
- /**
- * Returns the app level aggregator service for the specified application id.
- *
- * @return the app level aggregator service or null if it does not exist
- */
- public AppLevelAggregatorService getService(String appId) {
- return services.get(appId);
- }
-
- /**
- * Returns whether the app level aggregator service for the specified
- * application id exists.
- */
- public boolean hasService(String appId) {
- return services.containsKey(appId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
deleted file mode 100644
index 8768575..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import com.google.inject.Provider;
-
-/**
- * A guice provider that provides a global singleton instance of
- * AppLevelServiceManager.
- */
-public class AppLevelServiceManagerProvider
- implements Provider<AppLevelServiceManager> {
- @Override
- public AppLevelServiceManager get() {
- return AppLevelServiceManager.getInstance();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
new file mode 100644
index 0000000..95ec9f8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineAggregator extends TimelineAggregator {
+ private final String applicationId;
+ // TODO define key metadata such as flow metadata, user, and queue
+
+ public AppLevelTimelineAggregator(String applicationId) {
+ super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
deleted file mode 100644
index e362139..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage.
- *
- * Classes that extend this can add their own lifecycle management or
- * customization of request handling.
- */
-@Private
-@Unstable
-public class BaseAggregatorService extends CompositeService {
- private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class);
-
- public BaseAggregatorService(String name) {
- super(name);
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- }
-
- /**
- * Handles entity writes. These writes are synchronous and are written to the
- * backing storage without buffering/batching. If any entity already exists,
- * it results in an update of the entity.
- *
- * This method should be reserved for selected critical entities and events.
- * For normal voluminous writes one should use the async method
- * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
- *
- * @param entities entities to post
- * @param callerUgi the caller UGI
- */
- public void postEntities(TimelineEntities entities,
- UserGroupInformation callerUgi) {
- // Add this output temporarily for our prototype
- // TODO remove this after we have an actual implementation
- LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
- LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
-
- // TODO implement
- if (LOG.isDebugEnabled()) {
- LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
- }
- }
-
- /**
- * Handles entity writes in an asynchronous manner. The method returns as soon
- * as validation is done. No promises are made on how quickly it will be
- * written to the backing storage or if it will always be written to the
- * backing storage. Multiple writes to the same entities may be batched and
- * appropriate values updated and result in fewer writes to the backing
- * storage.
- *
- * @param entities entities to post
- * @param callerUgi the caller UGI
- */
- public void postEntitiesAsync(TimelineEntities entities,
- UserGroupInformation callerUgi) {
- // TODO implement
- if (LOG.isDebugEnabled()) {
- LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
deleted file mode 100644
index deb21c7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ContainerContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.hadoop.http.HttpServer2;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-/**
- * The top-level server for the per-node timeline aggregator service. Currently
- * it is defined as an auxiliary service to accommodate running within another
- * daemon (e.g. node manager).
- */
-@Private
-@Unstable
-public class PerNodeAggregatorServer extends AuxiliaryService {
- private static final Log LOG =
- LogFactory.getLog(PerNodeAggregatorServer.class);
- private static final int SHUTDOWN_HOOK_PRIORITY = 30;
- static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
-
- private final AppLevelServiceManager serviceManager;
- private HttpServer2 timelineRestServer;
-
- public PerNodeAggregatorServer() {
- // use the same singleton
- this(AppLevelServiceManager.getInstance());
- }
-
- @VisibleForTesting
- PerNodeAggregatorServer(AppLevelServiceManager serviceManager) {
- super("timeline_aggregator");
- this.serviceManager = serviceManager;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- serviceManager.init(conf);
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- serviceManager.start();
- startWebApp();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- if (timelineRestServer != null) {
- timelineRestServer.stop();
- }
- // stop the service manager
- serviceManager.stop();
- super.serviceStop();
- }
-
- private void startWebApp() {
- Configuration conf = getConfig();
- // use the same ports as the old ATS for now; we could create new properties
- // for the new timeline service if needed
- String bindAddress = WebAppUtils.getWebAppBindURL(conf,
- YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
- WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
- LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
- try {
- Configuration confForInfoServer = new Configuration(conf);
- confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
- HttpServer2.Builder builder = new HttpServer2.Builder()
- .setName("timeline")
- .setConf(conf)
- .addEndpoint(URI.create("http://" + bindAddress));
- timelineRestServer = builder.build();
- // TODO: replace this by an authentication filter in future.
- HashMap<String, String> options = new HashMap<>();
- String username = conf.get(HADOOP_HTTP_STATIC_USER,
- DEFAULT_HADOOP_HTTP_STATIC_USER);
- options.put(HADOOP_HTTP_STATIC_USER, username);
- HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
- "static_user_filter_timeline",
- StaticUserWebFilter.StaticUserFilter.class.getName(),
- options, new String[] {"/*"});
-
- timelineRestServer.addJerseyResourcePackage(
- PerNodeAggregatorWebService.class.getPackage().getName() + ";"
- + GenericExceptionHandler.class.getPackage().getName() + ";"
- + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
- "/*");
- timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
- AppLevelServiceManager.getInstance());
- timelineRestServer.start();
- } catch (Exception e) {
- String msg = "The per-node aggregator webapp failed to start.";
- LOG.error(msg, e);
- throw new YarnRuntimeException(msg, e);
- }
- }
-
- // these methods can be used as the basis for future service methods if the
- // per-node aggregator runs separate from the node manager
- /**
- * Creates and adds an app level aggregator service for the specified
- * application id. The service is also initialized and started. If the service
- * already exists, no new service is created.
- *
- * @return whether it was added successfully
- */
- public boolean addApplication(ApplicationId appId) {
- String appIdString = appId.toString();
- return serviceManager.addService(appIdString);
- }
-
- /**
- * Removes the app level aggregator service for the specified application id.
- * The service is also stopped as a result. If the service does not exist, no
- * change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean removeApplication(ApplicationId appId) {
- String appIdString = appId.toString();
- return serviceManager.removeService(appIdString);
- }
-
- /**
- * Creates and adds an app level aggregator service for the specified
- * application id. The service is also initialized and started. If the service
- * already exists, no new service is created.
- */
- @Override
- public void initializeContainer(ContainerInitializationContext context) {
- // intercept the event of the AM container being created and initialize the
- // app level aggregator service
- if (isApplicationMaster(context)) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- addApplication(appId);
- }
- }
-
- /**
- * Removes the app level aggregator service for the specified application id.
- * The service is also stopped as a result. If the service does not exist, no
- * change is made.
- */
- @Override
- public void stopContainer(ContainerTerminationContext context) {
- // intercept the event of the AM container being stopped and remove the app
- // level aggregator service
- if (isApplicationMaster(context)) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- removeApplication(appId);
- }
- }
-
- private boolean isApplicationMaster(ContainerContext context) {
- // TODO this is based on a (shaky) assumption that the container id (the
- // last field of the full container id) for an AM is always 1
- // we want to make this much more reliable
- ContainerId containerId = context.getContainerId();
- return containerId.getContainerId() == 1L;
- }
-
- @VisibleForTesting
- boolean hasApplication(String appId) {
- return serviceManager.hasService(appId);
- }
-
- @Override
- public void initializeApplication(ApplicationInitializationContext context) {
- }
-
- @Override
- public void stopApplication(ApplicationTerminationContext context) {
- }
-
- @Override
- public ByteBuffer getMetaData() {
- // TODO currently it is not used; we can return a more meaningful data when
- // we connect it with an AM
- return ByteBuffer.allocate(0);
- }
-
- @VisibleForTesting
- public static PerNodeAggregatorServer launchServer(String[] args) {
- Thread
- .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
- LOG);
- PerNodeAggregatorServer server = null;
- try {
- server = new PerNodeAggregatorServer();
- ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server),
- SHUTDOWN_HOOK_PRIORITY);
- YarnConfiguration conf = new YarnConfiguration();
- server.init(conf);
- server.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting PerNodeAggregatorServer", t);
- ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
- }
- return server;
- }
-
- private static class ShutdownHook implements Runnable {
- private final PerNodeAggregatorServer server;
-
- public ShutdownHook(PerNodeAggregatorServer server) {
- this.server = server;
- }
-
- public void run() {
- server.stop();
- }
- }
-
- public static void main(String[] args) {
- launchServer(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
deleted file mode 100644
index ffe099e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.webapp.ForbiddenException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-
-import com.google.inject.Singleton;
-
-/**
- * The main per-node REST end point for timeline service writes. It is
- * essentially a container service that routes requests to the appropriate
- * per-app services.
- */
-@Private
-@Unstable
-@Singleton
-@Path("/ws/v2/timeline")
-public class PerNodeAggregatorWebService {
- private static final Log LOG =
- LogFactory.getLog(PerNodeAggregatorWebService.class);
-
- private @Context ServletContext context;
-
- @XmlRootElement(name = "about")
- @XmlAccessorType(XmlAccessType.NONE)
- @Public
- @Unstable
- public static class AboutInfo {
-
- private String about;
-
- public AboutInfo() {
-
- }
-
- public AboutInfo(String about) {
- this.about = about;
- }
-
- @XmlElement(name = "About")
- public String getAbout() {
- return about;
- }
-
- public void setAbout(String about) {
- this.about = about;
- }
-
- }
-
- /**
- * Return the description of the timeline web services.
- */
- @GET
- @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public AboutInfo about(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res) {
- init(res);
- return new AboutInfo("Timeline API");
- }
-
- /**
- * Accepts writes to the aggregator, and returns a response. It simply routes
- * the request to the app level aggregator. It expects an application as a
- * context.
- */
- @PUT
- @Path("/entities")
- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public Response putEntities(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("async") String async,
- @QueryParam("appid") String appId,
- TimelineEntities entities) {
- init(res);
- UserGroupInformation callerUgi = getUser(req);
- if (callerUgi == null) {
- String msg = "The owner of the posted timeline entities is not set";
- LOG.error(msg);
- throw new ForbiddenException(msg);
- }
-
- // TODO how to express async posts and handle them
- boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
- try {
- appId = parseApplicationId(appId);
- if (appId == null) {
- return Response.status(Response.Status.BAD_REQUEST).build();
- }
- AppLevelAggregatorService service = getAggregatorService(req, appId);
- if (service == null) {
- LOG.error("Application not found");
- throw new NotFoundException(); // different exception?
- }
- service.postEntities(entities, callerUgi);
- return Response.ok().build();
- } catch (Exception e) {
- LOG.error("Error putting entities", e);
- throw new WebApplicationException(e,
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- private String parseApplicationId(String appId) {
- // Make sure the appId is not null and is valid
- ApplicationId appID;
- try {
- if (appId != null) {
- return ConverterUtils.toApplicationId(appId.trim()).toString();
- } else {
- return null;
- }
- } catch (Exception e) {
- return null;
- }
- }
-
- private AppLevelAggregatorService
- getAggregatorService(HttpServletRequest req, String appIdToParse) {
- String appIdString = parseApplicationId(appIdToParse);
- final AppLevelServiceManager serviceManager =
- (AppLevelServiceManager) context.getAttribute(
- PerNodeAggregatorServer.AGGREGATOR_COLLECTION_ATTR_KEY);
- return serviceManager.getService(appIdString);
- }
-
- private void init(HttpServletResponse response) {
- response.setContentType(null);
- }
-
- private UserGroupInformation getUser(HttpServletRequest req) {
- String remoteUser = req.getRemoteUser();
- UserGroupInformation callerUgi = null;
- if (remoteUser != null) {
- callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
- }
- return callerUgi;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
new file mode 100644
index 0000000..cdc4e35
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline aggregator collection. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
+ private static final Log LOG =
+ LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
+ private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ private final TimelineAggregatorsCollection aggregatorCollection;
+
+ public PerNodeTimelineAggregatorsAuxService() {
+ // use the same singleton
+ this(TimelineAggregatorsCollection.getInstance());
+ }
+
+ @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
+ TimelineAggregatorsCollection aggregatorCollection) {
+ super("timeline_aggregator");
+ this.aggregatorCollection = aggregatorCollection;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ aggregatorCollection.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ aggregatorCollection.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ aggregatorCollection.stop();
+ super.serviceStop();
+ }
+
+ // these methods can be used as the basis for future service methods if the
+ // per-node aggregator runs separate from the node manager
+ /**
+ * Creates and adds an app level aggregator for the specified application id.
+ * The aggregator is also initialized and started. If the service already
+ * exists, no new service is created.
+ *
+ * @return whether it was added successfully
+ */
+ public boolean addApplication(ApplicationId appId) {
+ String appIdString = appId.toString();
+ AppLevelTimelineAggregator aggregator =
+ new AppLevelTimelineAggregator(appIdString);
+ return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
+ == aggregator);
+ }
+
+ /**
+ * Removes the app level aggregator for the specified application id. The
+ * aggregator is also stopped as a result. If the aggregator does not exist, no
+ * change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean removeApplication(ApplicationId appId) {
+ String appIdString = appId.toString();
+ return aggregatorCollection.remove(appIdString);
+ }
+
+ /**
+ * Creates and adds an app level aggregator for the specified application id.
+ * The aggregator is also initialized and started. If the aggregator already
+ * exists, no new aggregator is created.
+ */
+ @Override
+ public void initializeContainer(ContainerInitializationContext context) {
+ // intercept the event of the AM container being created and initialize the
+ // app level aggregator service
+ if (isApplicationMaster(context)) {
+ ApplicationId appId = context.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ addApplication(appId);
+ }
+ }
+
+ /**
+ * Removes the app level aggregator for the specified application id. The
+ * aggregator is also stopped as a result. If the aggregator does not exist, no
+ * change is made.
+ */
+ @Override
+ public void stopContainer(ContainerTerminationContext context) {
+ // intercept the event of the AM container being stopped and remove the app
+ // level aggregator service
+ if (isApplicationMaster(context)) {
+ ApplicationId appId = context.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ removeApplication(appId);
+ }
+ }
+
+ private boolean isApplicationMaster(ContainerContext context) {
+ // TODO this is based on a (shaky) assumption that the container id (the
+ // last field of the full container id) for an AM is always 1
+ // we want to make this much more reliable
+ ContainerId containerId = context.getContainerId();
+ return containerId.getContainerId() == 1L;
+ }
+
+ @VisibleForTesting
+ boolean hasApplication(String appId) {
+ return aggregatorCollection.containsKey(appId);
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext context) {
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext context) {
+ }
+
+ @Override
+ public ByteBuffer getMetaData() {
+ // TODO currently it is not used; we can return a more meaningful data when
+ // we connect it with an AM
+ return ByteBuffer.allocate(0);
+ }
+
+ @VisibleForTesting
+ public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) {
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args,
+ LOG);
+ PerNodeTimelineAggregatorsAuxService auxService = null;
+ try {
+ auxService = new PerNodeTimelineAggregatorsAuxService();
+ ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
+ SHUTDOWN_HOOK_PRIORITY);
+ YarnConfiguration conf = new YarnConfiguration();
+ auxService.init(conf);
+ auxService.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting PerNodeAggregatorServer", t);
+ ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
+ }
+ return auxService;
+ }
+
+ private static class ShutdownHook implements Runnable {
+ private final PerNodeTimelineAggregatorsAuxService auxService;
+
+ public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
+ this.auxService = auxService;
+ }
+
+ public void run() {
+ auxService.stop();
+ }
+ }
+
+ public static void main(String[] args) {
+ launchServer(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
new file mode 100644
index 0000000..4227712
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can putIfAbsent their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public abstract class TimelineAggregator extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
+
+ public TimelineAggregator(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ /**
+ * Handles entity writes. These writes are synchronous and are written to the
+ * backing storage without buffering/batching. If any entity already exists,
+ * it results in an update of the entity.
+ *
+ * This method should be reserved for selected critical entities and events.
+ * For normal voluminous writes one should use the async method
+ * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ */
+ public void postEntities(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // Add this output temporarily for our prototype
+ // TODO remove this after we have an actual implementation
+ LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
+ LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ }
+
+ /**
+ * Handles entity writes in an asynchronous manner. The method returns as soon
+ * as validation is done. No promises are made on how quickly it will be
+ * written to the backing storage or if it will always be written to the
+ * backing storage. Multiple writes to the same entities may be batched and
+ * appropriate values updated and result in fewer writes to the backing
+ * storage.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ */
+ public void postEntitiesAsync(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
new file mode 100644
index 0000000..7d42f94
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class TimelineAggregatorWebService {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineAggregatorWebService.class);
+
+ private @Context ServletContext context;
+
+ @XmlRootElement(name = "about")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class AboutInfo {
+
+ private String about;
+
+ public AboutInfo() {
+
+ }
+
+ public AboutInfo(String about) {
+ this.about = about;
+ }
+
+ @XmlElement(name = "About")
+ public String getAbout() {
+ return about;
+ }
+
+ public void setAbout(String about) {
+ this.about = about;
+ }
+
+ }
+
+ /**
+ * Return the description of the timeline web services.
+ */
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public AboutInfo about(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res) {
+ init(res);
+ return new AboutInfo("Timeline API");
+ }
+
+ /**
+ * Accepts writes to the aggregator, and returns a response. It simply routes
+ * the request to the app level aggregator. It expects an application as a
+ * context.
+ */
+ @PUT
+ @Path("/entities")
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public Response putEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("async") String async,
+ @QueryParam("appid") String appId,
+ TimelineEntities entities) {
+ init(res);
+ UserGroupInformation callerUgi = getUser(req);
+ if (callerUgi == null) {
+ String msg = "The owner of the posted timeline entities is not set";
+ LOG.error(msg);
+ throw new ForbiddenException(msg);
+ }
+
+ // TODO how to express async posts and handle them
+ boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
+ try {
+ appId = parseApplicationId(appId);
+ if (appId == null) {
+ return Response.status(Response.Status.BAD_REQUEST).build();
+ }
+ TimelineAggregator service = getAggregatorService(req, appId);
+ if (service == null) {
+ LOG.error("Application not found");
+ throw new NotFoundException(); // different exception?
+ }
+ service.postEntities(entities, callerUgi);
+ return Response.ok().build();
+ } catch (Exception e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private String parseApplicationId(String appId) {
+ // Make sure the appId is not null and is valid
+ ApplicationId appID;
+ try {
+ if (appId != null) {
+ return ConverterUtils.toApplicationId(appId.trim()).toString();
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private TimelineAggregator
+ getAggregatorService(HttpServletRequest req, String appIdToParse) {
+ String appIdString = parseApplicationId(appIdToParse);
+ final TimelineAggregatorsCollection aggregatorCollection =
+ (TimelineAggregatorsCollection) context.getAttribute(
+ TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
+ return aggregatorCollection.get(appIdString);
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUgi = null;
+ if (remoteUser != null) {
+ callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUgi;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
new file mode 100644
index 0000000..73b6d52
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+/**
+ * Class that manages adding and removing aggregators and their lifecycle. It
+ * provides thread safety access to the aggregators inside.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class TimelineAggregatorsCollection extends CompositeService {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineAggregatorsCollection.class);
+ private static final TimelineAggregatorsCollection INSTANCE =
+ new TimelineAggregatorsCollection();
+
+ // access to this map is synchronized with the map itself
+ private final Map<String, TimelineAggregator> aggregators =
+ Collections.synchronizedMap(
+ new HashMap<String, TimelineAggregator>());
+
+ // REST server for this aggregator collection
+ private HttpServer2 timelineRestServer;
+
+ static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
+
+ static TimelineAggregatorsCollection getInstance() {
+ return INSTANCE;
+ }
+
+ TimelineAggregatorsCollection() {
+ super(TimelineAggregatorsCollection.class.getName());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ startWebApp();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (timelineRestServer != null) {
+ timelineRestServer.stop();
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * Put the aggregator into the collection if an aggregator mapped by id does
+ * not exist.
+ *
+ * @throws YarnRuntimeException if there was any exception in initializing and
+ * starting the app level service
+ * @return the aggregator associated with id after the potential put.
+ */
+ public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
+ synchronized (aggregators) {
+ TimelineAggregator aggregatorInTable = aggregators.get(id);
+ if (aggregatorInTable == null) {
+ try {
+ // initialize, start, and add it to the collection so it can be
+ // cleaned up when the parent shuts down
+ aggregator.init(getConfig());
+ aggregator.start();
+ aggregators.put(id, aggregator);
+ LOG.info("the aggregator for " + id + " was added");
+ return aggregator;
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ String msg = "the aggregator for " + id + " already exists!";
+ LOG.error(msg);
+ return aggregatorInTable;
+ }
+ }
+ }
+
+ /**
+ * Removes the aggregator for the specified id. The aggregator is also stopped
+ * as a result. If the aggregator does not exist, no change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean remove(String id) {
+ synchronized (aggregators) {
+ TimelineAggregator aggregator = aggregators.remove(id);
+ if (aggregator == null) {
+ String msg = "the aggregator for " + id + " does not exist!";
+ LOG.error(msg);
+ return false;
+ } else {
+ // stop the service to do clean up
+ aggregator.stop();
+ LOG.info("the aggregator service for " + id + " was removed");
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Returns the aggregator for the specified id.
+ *
+ * @return the aggregator or null if it does not exist
+ */
+ public TimelineAggregator get(String id) {
+ return aggregators.get(id);
+ }
+
+ /**
+ * Returns whether the aggregator for the specified id exists in this
+ * collection.
+ */
+ public boolean containsKey(String id) {
+ return aggregators.containsKey(id);
+ }
+
+ /**
+ * Launch the REST web server for this aggregator collection
+ */
+ private void startWebApp() {
+ Configuration conf = getConfig();
+ // use the same ports as the old ATS for now; we could create new properties
+ // for the new timeline service if needed
+ String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+ YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+ LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+ try {
+ Configuration confForInfoServer = new Configuration(conf);
+ confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+ HttpServer2.Builder builder = new HttpServer2.Builder()
+ .setName("timeline")
+ .setConf(conf)
+ .addEndpoint(URI.create("http://" + bindAddress));
+ timelineRestServer = builder.build();
+ // TODO: replace this by an authentication filter in future.
+ HashMap<String, String> options = new HashMap<>();
+ String username = conf.get(HADOOP_HTTP_STATIC_USER,
+ DEFAULT_HADOOP_HTTP_STATIC_USER);
+ options.put(HADOOP_HTTP_STATIC_USER, username);
+ HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+ "static_user_filter_timeline",
+ StaticUserWebFilter.StaticUserFilter.class.getName(),
+ options, new String[] {"/*"});
+
+ timelineRestServer.addJerseyResourcePackage(
+ TimelineAggregatorWebService.class.getPackage().getName() + ";"
+ + GenericExceptionHandler.class.getPackage().getName() + ";"
+ + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+ "/*");
+ timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
+ TimelineAggregatorsCollection.getInstance());
+ timelineRestServer.start();
+ } catch (Exception e) {
+ String msg = "The per-node aggregator webapp failed to start.";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
deleted file mode 100644
index c0af8c5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-
-public class TestAppLevelAggregatorService {
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
deleted file mode 100644
index 3f981c7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-public class TestAppLevelServiceManager {
-
- @Test(timeout=60000)
- public void testMultithreadedAdd() throws Exception {
- final AppLevelServiceManager serviceManager =
- spy(new AppLevelServiceManager());
- doReturn(new Configuration()).when(serviceManager).getConfig();
-
- final int NUM_APPS = 5;
- List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
- for (int i = 0; i < NUM_APPS; i++) {
- final String appId = String.valueOf(i);
- Callable<Boolean> task = new Callable<Boolean>() {
- public Boolean call() {
- return serviceManager.addService(appId);
- }
- };
- tasks.add(task);
- }
- ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
- try {
- List<Future<Boolean>> futures = executor.invokeAll(tasks);
- for (Future<Boolean> future: futures) {
- assertTrue(future.get());
- }
- } finally {
- executor.shutdownNow();
- }
- // check the keys
- for (int i = 0; i < NUM_APPS; i++) {
- assertTrue(serviceManager.hasService(String.valueOf(i)));
- }
- }
-
- @Test
- public void testMultithreadedAddAndRemove() throws Exception {
- final AppLevelServiceManager serviceManager =
- spy(new AppLevelServiceManager());
- doReturn(new Configuration()).when(serviceManager).getConfig();
-
- final int NUM_APPS = 5;
- List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
- for (int i = 0; i < NUM_APPS; i++) {
- final String appId = String.valueOf(i);
- Callable<Boolean> task = new Callable<Boolean>() {
- public Boolean call() {
- return serviceManager.addService(appId) &&
- serviceManager.removeService(appId);
- }
- };
- tasks.add(task);
- }
- ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
- try {
- List<Future<Boolean>> futures = executor.invokeAll(tasks);
- for (Future<Boolean> future: futures) {
- assertTrue(future.get());
- }
- } finally {
- executor.shutdownNow();
- }
- // check the keys
- for (int i = 0; i < NUM_APPS; i++) {
- assertFalse(serviceManager.hasService(String.valueOf(i)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
new file mode 100644
index 0000000..8f95814
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+
+public class TestAppLevelTimelineAggregator {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3ff7f06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
deleted file mode 100644
index 55953cd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-public class TestBaseAggregatorService {
-
-}