You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/03/19 19:33:28 UTC
[1/4] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to
TimelineCollector. Contributed by Sangjin Lee
Repository: hadoop
Updated Branches:
refs/heads/YARN-2928 8a637914c -> dda84085c
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
new file mode 100644
index 0000000..59ecef1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline collector manager. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
+ private static final Log LOG =
+ LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
+ private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ private final TimelineCollectorManager collectorManager;
+
+ public PerNodeTimelineCollectorsAuxService() {
+ // use the same singleton
+ this(TimelineCollectorManager.getInstance());
+ }
+
+ @VisibleForTesting PerNodeTimelineCollectorsAuxService(
+ TimelineCollectorManager collectorsManager) {
+ super("timeline_collector");
+ this.collectorManager = collectorsManager;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ collectorManager.init(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ collectorManager.start();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ collectorManager.stop();
+ super.serviceStop();
+ }
+
+ // these methods can be used as the basis for future service methods if the
+ // per-node collector runs separate from the node manager
+ /**
+ * Creates and adds an app level collector for the specified application id.
+ * The collector is also initialized and started. If the service already
+ * exists, no new service is created.
+ *
+ * @return whether it was added successfully
+ */
+ public boolean addApplication(ApplicationId appId) {
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId.toString());
+ return (collectorManager.putIfAbsent(appId, collector)
+ == collector);
+ }
+
+ /**
+ * Removes the app level collector for the specified application id. The
+ * collector is also stopped as a result. If the collector does not exist, no
+ * change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean removeApplication(ApplicationId appId) {
+ String appIdString = appId.toString();
+ return collectorManager.remove(appIdString);
+ }
+
+ /**
+ * Creates and adds an app level collector for the specified application id.
+ * The collector is also initialized and started. If the collector already
+ * exists, no new collector is created.
+ */
+ @Override
+ public void initializeContainer(ContainerInitializationContext context) {
+ // intercept the event of the AM container being created and initialize the
+ // app level collector service
+ if (isApplicationMaster(context)) {
+ ApplicationId appId = context.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ addApplication(appId);
+ }
+ }
+
+ /**
+ * Removes the app level collector for the specified application id. The
+ * collector is also stopped as a result. If the collector does not exist, no
+ * change is made.
+ */
+ @Override
+ public void stopContainer(ContainerTerminationContext context) {
+ // intercept the event of the AM container being stopped and remove the app
+ // level collector service
+ if (isApplicationMaster(context)) {
+ ApplicationId appId = context.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ removeApplication(appId);
+ }
+ }
+
+ private boolean isApplicationMaster(ContainerContext context) {
+ // TODO this is based on a (shaky) assumption that the container id (the
+ // last field of the full container id) for an AM is always 1
+ // we want to make this much more reliable
+ ContainerId containerId = context.getContainerId();
+ return containerId.getContainerId() == 1L;
+ }
+
+ @VisibleForTesting
+ boolean hasApplication(String appId) {
+ return collectorManager.containsKey(appId);
+ }
+
+ @Override
+ public void initializeApplication(ApplicationInitializationContext context) {
+ }
+
+ @Override
+ public void stopApplication(ApplicationTerminationContext context) {
+ }
+
+ @Override
+ public ByteBuffer getMetaData() {
+ // TODO currently it is not used; we can return a more meaningful data when
+ // we connect it with an AM
+ return ByteBuffer.allocate(0);
+ }
+
+ @VisibleForTesting
+ public static PerNodeTimelineCollectorsAuxService
+ launchServer(String[] args, TimelineCollectorManager collectorManager) {
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(
+ PerNodeTimelineCollectorsAuxService.class, args, LOG);
+ PerNodeTimelineCollectorsAuxService auxService = null;
+ try {
+ auxService = collectorManager == null ?
+ new PerNodeTimelineCollectorsAuxService() :
+ new PerNodeTimelineCollectorsAuxService(collectorManager);
+ ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
+ SHUTDOWN_HOOK_PRIORITY);
+ YarnConfiguration conf = new YarnConfiguration();
+ auxService.init(conf);
+ auxService.start();
+ } catch (Throwable t) {
+ LOG.fatal("Error starting PerNodeTimelineCollectorServer", t);
+ ExitUtil.terminate(-1, "Error starting PerNodeTimelineCollectorServer");
+ }
+ return auxService;
+ }
+
+ private static class ShutdownHook implements Runnable {
+ private final PerNodeTimelineCollectorsAuxService auxService;
+
+ public ShutdownHook(PerNodeTimelineCollectorsAuxService auxService) {
+ this.auxService = auxService;
+ }
+
+ public void run() {
+ auxService.stop();
+ }
+ }
+
+ public static void main(String[] args) {
+ launchServer(args, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
new file mode 100644
index 0000000..6e20e69
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public abstract class TimelineCollector extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+
+ private TimelineWriter writer;
+
+ public TimelineCollector(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ writer = ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class,
+ TimelineWriter.class), conf);
+ writer.init(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ writer.stop();
+ }
+
+ public TimelineWriter getWriter() {
+ return writer;
+ }
+
+ /**
+ * Handles entity writes. These writes are synchronous and are written to the
+ * backing storage without buffering/batching. If any entity already exists,
+ * it results in an update of the entity.
+ *
+ * This method should be reserved for selected critical entities and events.
+ * For normal voluminous writes one should use the async method
+ * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ * @return the response that contains the result of the post.
+ */
+ public TimelineWriteResponse postEntities(TimelineEntities entities,
+ UserGroupInformation callerUgi) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
+ LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+ + callerUgi + ")");
+ }
+
+ return writer.write(entities);
+ }
+
+ /**
+ * Handles entity writes in an asynchronous manner. The method returns as soon
+ * as validation is done. No promises are made on how quickly it will be
+ * written to the backing storage or if it will always be written to the
+ * backing storage. Multiple writes to the same entities may be batched and
+ * appropriate values updated and result in fewer writes to the backing
+ * storage.
+ *
+ * @param entities entities to post
+ * @param callerUgi the caller UGI
+ */
+ public void postEntitiesAsync(TimelineEntities entities,
+ UserGroupInformation callerUgi) {
+ // TODO implement
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+ callerUgi + ")");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
new file mode 100644
index 0000000..3691162
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class that manages adding and removing collectors and their lifecycle. It
+ * provides thread safety access to the collectors inside.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class TimelineCollectorManager extends CompositeService {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineCollectorManager.class);
+ private static final TimelineCollectorManager INSTANCE =
+ new TimelineCollectorManager();
+
+ // access to this map is synchronized with the map itself
+ private final Map<String, TimelineCollector> collectors =
+ Collections.synchronizedMap(
+ new HashMap<String, TimelineCollector>());
+
+ // REST server for this collector manager
+ private HttpServer2 timelineRestServer;
+
+ private String timelineRestServerBindAddress;
+
+ private CollectorNodemanagerProtocol nmCollectorService;
+
+ private InetSocketAddress nmCollectorServiceAddress;
+
+ static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
+
+ static TimelineCollectorManager getInstance() {
+ return INSTANCE;
+ }
+
+ @VisibleForTesting
+ protected TimelineCollectorManager() {
+ super(TimelineCollectorManager.class.getName());
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ this.nmCollectorServiceAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ startWebApp();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (timelineRestServer != null) {
+ timelineRestServer.stop();
+ }
+ super.serviceStop();
+ }
+
+ /**
+ * Put the collector into the collection if an collector mapped by id does
+ * not exist.
+ *
+ * @throws YarnRuntimeException if there was any exception in initializing and
+ * starting the app level service
+ * @return the collector associated with id after the potential put.
+ */
+ public TimelineCollector putIfAbsent(ApplicationId appId,
+ TimelineCollector collector) {
+ String id = appId.toString();
+ TimelineCollector collectorInTable;
+ boolean collectorIsNew = false;
+ synchronized (collectors) {
+ collectorInTable = collectors.get(id);
+ if (collectorInTable == null) {
+ try {
+ // initialize, start, and add it to the collection so it can be
+ // cleaned up when the parent shuts down
+ collector.init(getConfig());
+ collector.start();
+ collectors.put(id, collector);
+ LOG.info("the collector for " + id + " was added");
+ collectorInTable = collector;
+ collectorIsNew = true;
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ String msg = "the collector for " + id + " already exists!";
+ LOG.error(msg);
+ }
+
+ }
+ // Report to NM if a new collector is added.
+ if (collectorIsNew) {
+ try {
+ reportNewCollectorToNM(appId);
+ } catch (Exception e) {
+ // throw exception here as it cannot be used if failed report to NM
+ LOG.error("Failed to report a new collector for application: " + appId +
+ " to the NM Collector Service.");
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ return collectorInTable;
+ }
+
+ /**
+ * Removes the collector for the specified id. The collector is also stopped
+ * as a result. If the collector does not exist, no change is made.
+ *
+ * @return whether it was removed successfully
+ */
+ public boolean remove(String id) {
+ synchronized (collectors) {
+ TimelineCollector collector = collectors.remove(id);
+ if (collector == null) {
+ String msg = "the collector for " + id + " does not exist!";
+ LOG.error(msg);
+ return false;
+ } else {
+ // stop the service to do clean up
+ collector.stop();
+ LOG.info("the collector service for " + id + " was removed");
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Returns the collector for the specified id.
+ *
+ * @return the collector or null if it does not exist
+ */
+ public TimelineCollector get(String id) {
+ return collectors.get(id);
+ }
+
+ /**
+ * Returns whether the collector for the specified id exists in this
+ * collection.
+ */
+ public boolean containsKey(String id) {
+ return collectors.containsKey(id);
+ }
+
+ /**
+ * Launch the REST web server for this collector manager
+ */
+ private void startWebApp() {
+ Configuration conf = getConfig();
+ // use the same ports as the old ATS for now; we could create new properties
+ // for the new timeline service if needed
+ String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+ YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+ WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+ this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+ NetUtils.createSocketAddr(bindAddress));
+ LOG.info("Instantiating the per-node collector webapp at " +
+ timelineRestServerBindAddress);
+ try {
+ Configuration confForInfoServer = new Configuration(conf);
+ confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+ HttpServer2.Builder builder = new HttpServer2.Builder()
+ .setName("timeline")
+ .setConf(conf)
+ .addEndpoint(URI.create("http://" + bindAddress));
+ timelineRestServer = builder.build();
+ // TODO: replace this by an authentication filter in future.
+ HashMap<String, String> options = new HashMap<>();
+ String username = conf.get(HADOOP_HTTP_STATIC_USER,
+ DEFAULT_HADOOP_HTTP_STATIC_USER);
+ options.put(HADOOP_HTTP_STATIC_USER, username);
+ HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+ "static_user_filter_timeline",
+ StaticUserWebFilter.StaticUserFilter.class.getName(),
+ options, new String[] {"/*"});
+
+ timelineRestServer.addJerseyResourcePackage(
+ TimelineCollectorWebService.class.getPackage().getName() + ";"
+ + GenericExceptionHandler.class.getPackage().getName() + ";"
+ + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+ "/*");
+ timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY,
+ TimelineCollectorManager.getInstance());
+ timelineRestServer.start();
+ } catch (Exception e) {
+ String msg = "The per-node collector webapp failed to start.";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ }
+
+ private void reportNewCollectorToNM(ApplicationId appId)
+ throws YarnException, IOException {
+ this.nmCollectorService = getNMCollectorService();
+ ReportNewCollectorInfoRequest request =
+ ReportNewCollectorInfoRequest.newInstance(appId,
+ this.timelineRestServerBindAddress);
+ LOG.info("Report a new collector for application: " + appId +
+ " to the NM Collector Service.");
+ nmCollectorService.reportNewCollectorInfo(request);
+ }
+
+ @VisibleForTesting
+ protected CollectorNodemanagerProtocol getNMCollectorService() {
+ Configuration conf = getConfig();
+ final YarnRPC rpc = YarnRPC.create(conf);
+
+ // TODO Security settings.
+ return (CollectorNodemanagerProtocol) rpc.getProxy(
+ CollectorNodemanagerProtocol.class,
+ nmCollectorServiceAddress, conf);
+ }
+
+ @VisibleForTesting
+ public String getRestServerBindAddress() {
+ return timelineRestServerBindAddress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
new file mode 100644
index 0000000..5adae71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class TimelineCollectorWebService {
+ private static final Log LOG =
+ LogFactory.getLog(TimelineCollectorWebService.class);
+
+ private @Context ServletContext context;
+
+ @XmlRootElement(name = "about")
+ @XmlAccessorType(XmlAccessType.NONE)
+ @Public
+ @Unstable
+ public static class AboutInfo {
+
+ private String about;
+
+ public AboutInfo() {
+
+ }
+
+ public AboutInfo(String about) {
+ this.about = about;
+ }
+
+ @XmlElement(name = "About")
+ public String getAbout() {
+ return about;
+ }
+
+ public void setAbout(String about) {
+ this.about = about;
+ }
+
+ }
+
+ /**
+ * Return the description of the timeline web services.
+ */
+ @GET
+ @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public AboutInfo about(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res) {
+ init(res);
+ return new AboutInfo("Timeline Collector API");
+ }
+
+ /**
+ * Accepts writes to the collector, and returns a response. It simply routes
+ * the request to the app level collector. It expects an application as a
+ * context.
+ */
+ @PUT
+ @Path("/entities")
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public Response putEntities(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("async") String async,
+ @QueryParam("appid") String appId,
+ TimelineEntities entities) {
+ init(res);
+ UserGroupInformation callerUgi = getUser(req);
+ if (callerUgi == null) {
+ String msg = "The owner of the posted timeline entities is not set";
+ LOG.error(msg);
+ throw new ForbiddenException(msg);
+ }
+
+ // TODO how to express async posts and handle them
+ boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
+ try {
+ appId = parseApplicationId(appId);
+ if (appId == null) {
+ return Response.status(Response.Status.BAD_REQUEST).build();
+ }
+ TimelineCollector collector = getCollector(req, appId);
+ if (collector == null) {
+ LOG.error("Application not found");
+ throw new NotFoundException(); // different exception?
+ }
+ collector.postEntities(entities, callerUgi);
+ return Response.ok().build();
+ } catch (Exception e) {
+ LOG.error("Error putting entities", e);
+ throw new WebApplicationException(e,
+ Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private String parseApplicationId(String appId) {
+ try {
+ if (appId != null) {
+ return ConverterUtils.toApplicationId(appId.trim()).toString();
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private TimelineCollector
+ getCollector(HttpServletRequest req, String appIdToParse) {
+ String appIdString = parseApplicationId(appIdToParse);
+ final TimelineCollectorManager collectorManager =
+ (TimelineCollectorManager) context.getAttribute(
+ TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
+ return collectorManager.get(appIdString);
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ private UserGroupInformation getUser(HttpServletRequest req) {
+ String remoteUser = req.getRemoteUser();
+ UserGroupInformation callerUgi = null;
+ if (remoteUser != null) {
+ callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+ }
+ return callerUgi;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 4a57e97..f5603f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -65,7 +65,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
* Stores the entire information in {@link TimelineEntity} to the
* timeline store. Any errors occurring for individual write request objects
* will be reported in the response.
- *
+ *
* @param data
* a {@link TimelineEntity} object
* @return {@link TimelineWriteResponse} object.
@@ -116,10 +116,10 @@ public class FileSystemTimelineWriterImpl extends AbstractService
* Aggregates the entity information to the timeline store based on which
* track this entity is to be rolled up to The tracks along which aggregations
* are to be done are given by {@link TimelineAggregationTrack}
- *
+ *
* Any errors occurring for individual write request objects will be reported
* in the response.
- *
+ *
* @param data
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum value
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
deleted file mode 100644
index 8f95814..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-
-public class TestAppLevelTimelineAggregator {
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
deleted file mode 100644
index 1c89ead..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.junit.Test;
-
-public class TestPerNodeTimelineAggregatorsAuxService {
- private ApplicationAttemptId appAttemptId;
-
- public TestPerNodeTimelineAggregatorsAuxService() {
- ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- }
-
- @Test
- public void testAddApplication() throws Exception {
- PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
- // auxService should have a single app
- assertTrue(auxService.hasApplication(
- appAttemptId.getApplicationId().toString()));
- auxService.close();
- }
-
- @Test
- public void testAddApplicationNonAMContainer() throws Exception {
- PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
-
- ContainerId containerId = getContainerId(2L); // not an AM
- ContainerInitializationContext context =
- mock(ContainerInitializationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- auxService.initializeContainer(context);
- // auxService should not have that app
- assertFalse(auxService.hasApplication(
- appAttemptId.getApplicationId().toString()));
- }
-
- @Test
- public void testRemoveApplication() throws Exception {
- PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
- // auxService should have a single app
- String appIdStr = appAttemptId.getApplicationId().toString();
- assertTrue(auxService.hasApplication(appIdStr));
-
- ContainerId containerId = getAMContainerId();
- ContainerTerminationContext context =
- mock(ContainerTerminationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- auxService.stopContainer(context);
- // auxService should not have that app
- assertFalse(auxService.hasApplication(appIdStr));
- auxService.close();
- }
-
- @Test
- public void testRemoveApplicationNonAMContainer() throws Exception {
- PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
- // auxService should have a single app
- String appIdStr = appAttemptId.getApplicationId().toString();
- assertTrue(auxService.hasApplication(appIdStr));
-
- ContainerId containerId = getContainerId(2L); // not an AM
- ContainerTerminationContext context =
- mock(ContainerTerminationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- auxService.stopContainer(context);
- // auxService should still have that app
- assertTrue(auxService.hasApplication(appIdStr));
- auxService.close();
- }
-
- @Test(timeout = 60000)
- public void testLaunch() throws Exception {
- ExitUtil.disableSystemExit();
- PerNodeTimelineAggregatorsAuxService auxService = null;
- try {
- auxService =
- PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
- } catch (ExitUtil.ExitException e) {
- assertEquals(0, e.status);
- ExitUtil.resetFirstExitException();
- fail();
- } finally {
- if (auxService != null) {
- auxService.stop();
- }
- }
- }
-
- private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() {
- PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
- // create an AM container
- ContainerId containerId = getAMContainerId();
- ContainerInitializationContext context =
- mock(ContainerInitializationContext.class);
- when(context.getContainerId()).thenReturn(containerId);
- auxService.initializeContainer(context);
- return auxService;
- }
-
- private PerNodeTimelineAggregatorsAuxService createAggregator() {
- TimelineAggregatorsCollection
- aggregatorsCollection = spy(new TimelineAggregatorsCollection());
- doReturn(new Configuration()).when(aggregatorsCollection).getConfig();
- PerNodeTimelineAggregatorsAuxService auxService =
- spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection));
- return auxService;
- }
-
- private ContainerId getAMContainerId() {
- return getContainerId(1L);
- }
-
- private ContainerId getContainerId(long id) {
- return ContainerId.newContainerId(appAttemptId, id);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
deleted file mode 100644
index dd64629..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.junit.Test;
-
-public class TestTimelineAggregatorsCollection {
-
- @Test(timeout=60000)
- public void testMultithreadedAdd() throws Exception {
- final TimelineAggregatorsCollection aggregatorCollection =
- spy(new TimelineAggregatorsCollection());
- doReturn(new Configuration()).when(aggregatorCollection).getConfig();
-
- final int NUM_APPS = 5;
- List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
- for (int i = 0; i < NUM_APPS; i++) {
- final ApplicationId appId = ApplicationId.newInstance(0L, i);
- Callable<Boolean> task = new Callable<Boolean>() {
- public Boolean call() {
- AppLevelTimelineAggregator aggregator =
- new AppLevelTimelineAggregator(appId.toString());
- return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
- }
- };
- tasks.add(task);
- }
- ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
- try {
- List<Future<Boolean>> futures = executor.invokeAll(tasks);
- for (Future<Boolean> future: futures) {
- assertTrue(future.get());
- }
- } finally {
- executor.shutdownNow();
- }
- // check the keys
- for (int i = 0; i < NUM_APPS; i++) {
- assertTrue(aggregatorCollection.containsKey(String.valueOf(i)));
- }
- }
-
- @Test
- public void testMultithreadedAddAndRemove() throws Exception {
- final TimelineAggregatorsCollection aggregatorCollection =
- spy(new TimelineAggregatorsCollection());
- doReturn(new Configuration()).when(aggregatorCollection).getConfig();
-
- final int NUM_APPS = 5;
- List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
- for (int i = 0; i < NUM_APPS; i++) {
- final ApplicationId appId = ApplicationId.newInstance(0L, i);
- Callable<Boolean> task = new Callable<Boolean>() {
- public Boolean call() {
- AppLevelTimelineAggregator aggregator =
- new AppLevelTimelineAggregator(appId.toString());
- boolean successPut =
- (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
- return successPut && aggregatorCollection.remove(appId.toString());
- }
- };
- tasks.add(task);
- }
- ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
- try {
- List<Future<Boolean>> futures = executor.invokeAll(tasks);
- for (Future<Boolean> future: futures) {
- assertTrue(future.get());
- }
- } finally {
- executor.shutdownNow();
- }
- // check the keys
- for (int i = 0; i < NUM_APPS; i++) {
- assertFalse(aggregatorCollection.containsKey(String.valueOf(i)));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java
new file mode 100644
index 0000000..74c81a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+
+public class TestAppLevelTimelineCollector {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
new file mode 100644
index 0000000..3b20352
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeTimelineCollectorsAuxService {
+ private ApplicationAttemptId appAttemptId;
+
+ public TestPerNodeTimelineCollectorsAuxService() {
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ }
+
+ @Test
+ public void testAddApplication() throws Exception {
+ PerNodeTimelineCollectorsAuxService auxService =
+ createCollectorAndAddApplication();
+ // auxService should have a single app
+ assertTrue(auxService.hasApplication(
+ appAttemptId.getApplicationId().toString()));
+ auxService.close();
+ }
+
+ @Test
+ public void testAddApplicationNonAMContainer() throws Exception {
+ PerNodeTimelineCollectorsAuxService auxService = createCollector();
+
+ ContainerId containerId = getContainerId(2L); // not an AM
+ ContainerInitializationContext context =
+ mock(ContainerInitializationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.initializeContainer(context);
+ // auxService should not have that app
+ assertFalse(auxService.hasApplication(
+ appAttemptId.getApplicationId().toString()));
+ }
+
+ @Test
+ public void testRemoveApplication() throws Exception {
+ PerNodeTimelineCollectorsAuxService auxService =
+ createCollectorAndAddApplication();
+ // auxService should have a single app
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ assertTrue(auxService.hasApplication(appIdStr));
+
+ ContainerId containerId = getAMContainerId();
+ ContainerTerminationContext context =
+ mock(ContainerTerminationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.stopContainer(context);
+ // auxService should not have that app
+ assertFalse(auxService.hasApplication(appIdStr));
+ auxService.close();
+ }
+
+ @Test
+ public void testRemoveApplicationNonAMContainer() throws Exception {
+ PerNodeTimelineCollectorsAuxService auxService =
+ createCollectorAndAddApplication();
+ // auxService should have a single app
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ assertTrue(auxService.hasApplication(appIdStr));
+
+ ContainerId containerId = getContainerId(2L); // not an AM
+ ContainerTerminationContext context =
+ mock(ContainerTerminationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.stopContainer(context);
+ // auxService should still have that app
+ assertTrue(auxService.hasApplication(appIdStr));
+ auxService.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testLaunch() throws Exception {
+ ExitUtil.disableSystemExit();
+ PerNodeTimelineCollectorsAuxService auxService = null;
+ try {
+ auxService =
+ PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
+ createCollectorManager());
+ } catch (ExitUtil.ExitException e) {
+ assertEquals(0, e.status);
+ ExitUtil.resetFirstExitException();
+ fail();
+ } finally {
+ if (auxService != null) {
+ auxService.stop();
+ }
+ }
+ }
+
+ private PerNodeTimelineCollectorsAuxService
+ createCollectorAndAddApplication() {
+ PerNodeTimelineCollectorsAuxService auxService = createCollector();
+ // create an AM container
+ ContainerId containerId = getAMContainerId();
+ ContainerInitializationContext context =
+ mock(ContainerInitializationContext.class);
+ when(context.getContainerId()).thenReturn(containerId);
+ auxService.initializeContainer(context);
+ return auxService;
+ }
+
+ private PerNodeTimelineCollectorsAuxService createCollector() {
+ TimelineCollectorManager collectorManager = createCollectorManager();
+ PerNodeTimelineCollectorsAuxService auxService =
+ spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
+ return auxService;
+ }
+
+ private TimelineCollectorManager createCollectorManager() {
+ TimelineCollectorManager collectorManager =
+ spy(new TimelineCollectorManager());
+ doReturn(new Configuration()).when(collectorManager).getConfig();
+ CollectorNodemanagerProtocol nmCollectorService =
+ mock(CollectorNodemanagerProtocol.class);
+ doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
+ return collectorManager;
+ }
+
+ private ContainerId getAMContainerId() {
+ return getContainerId(1L);
+ }
+
+ private ContainerId getContainerId(long id) {
+ return ContainerId.newContainerId(appAttemptId, id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
new file mode 100644
index 0000000..541665b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.junit.Test;
+
+public class TestTimelineCollectorManager {
+
+ @Test(timeout=60000)
+ public void testMultithreadedAdd() throws Exception {
+ final TimelineCollectorManager collectorManager = createCollectorManager();
+
+ final int NUM_APPS = 5;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ final ApplicationId appId = ApplicationId.newInstance(0L, i);
+ Callable<Boolean> task = new Callable<Boolean>() {
+ public Boolean call() {
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId.toString());
+ return (collectorManager.putIfAbsent(appId, collector) == collector);
+ }
+ };
+ tasks.add(task);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+ try {
+ List<Future<Boolean>> futures = executor.invokeAll(tasks);
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ // check the keys
+ for (int i = 0; i < NUM_APPS; i++) {
+ final ApplicationId appId = ApplicationId.newInstance(0L, i);
+ assertTrue(collectorManager.containsKey(appId.toString()));
+ }
+ }
+
+ @Test
+ public void testMultithreadedAddAndRemove() throws Exception {
+ final TimelineCollectorManager collectorManager = createCollectorManager();
+
+ final int NUM_APPS = 5;
+ List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ final ApplicationId appId = ApplicationId.newInstance(0L, i);
+ Callable<Boolean> task = new Callable<Boolean>() {
+ public Boolean call() {
+ AppLevelTimelineCollector collector =
+ new AppLevelTimelineCollector(appId.toString());
+ boolean successPut =
+ (collectorManager.putIfAbsent(appId, collector) == collector);
+ return successPut && collectorManager.remove(appId.toString());
+ }
+ };
+ tasks.add(task);
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+ try {
+ List<Future<Boolean>> futures = executor.invokeAll(tasks);
+ for (Future<Boolean> future: futures) {
+ assertTrue(future.get());
+ }
+ } finally {
+ executor.shutdownNow();
+ }
+ // check the keys
+ for (int i = 0; i < NUM_APPS; i++) {
+ final ApplicationId appId = ApplicationId.newInstance(0L, i);
+ assertFalse(collectorManager.containsKey(appId.toString()));
+ }
+ }
+
+ private TimelineCollectorManager createCollectorManager() {
+ final TimelineCollectorManager collectorManager =
+ spy(new TimelineCollectorManager());
+ doReturn(new Configuration()).when(collectorManager).getConfig();
+ CollectorNodemanagerProtocol nmCollectorService =
+ mock(CollectorNodemanagerProtocol.class);
+ doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
+ return collectorManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index f720454..7f919f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -27,12 +27,12 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Test;
-import org.apache.commons.io.FileUtils;
public class TestFileSystemTimelineWriterImpl {
@@ -42,9 +42,6 @@ public class TestFileSystemTimelineWriterImpl {
*/
@Test
public void testWriteEntityToFile() throws Exception {
- String name = "unit_test_BaseAggregator_testWriteEntityToFile_"
- + Long.toString(System.currentTimeMillis());
-
TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
String id = "hello";
@@ -55,25 +52,27 @@ public class TestFileSystemTimelineWriterImpl {
entity.setModifiedTime(1425016502000L);
te.addEntity(entity);
- FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl();
- fsi.serviceInit(new Configuration());
- fsi.write(te);
+ try (FileSystemTimelineWriterImpl fsi =
+ new FileSystemTimelineWriterImpl()) {
+ fsi.serviceInit(new Configuration());
+ fsi.write(te);
- String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- Path path = Paths.get(fileName);
- File f = new File(fileName);
- assertTrue(f.exists() && !f.isDirectory());
- List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
- // ensure there's only one entity + 1 new line
- assertTrue(data.size() == 2);
- String d = data.get(0);
- // confirm the contents same as what was written
- assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
+ + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ Path path = Paths.get(fileName);
+ File f = new File(fileName);
+ assertTrue(f.exists() && !f.isDirectory());
+ List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+ // ensure there's only one entity + 1 new line
+ assertTrue(data.size() == 2);
+ String d = data.get(0);
+ // confirm the contents same as what was written
+ assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
- // delete the directory
- File outputDir = new File(fsi.getOutputRoot());
- FileUtils.deleteDirectory(outputDir);
- assertTrue(!(f.exists()));
+ // delete the directory
+ File outputDir = new File(fsi.getOutputRoot());
+ FileUtils.deleteDirectory(outputDir);
+ assertTrue(!(f.exists()));
+ }
}
}
[2/4] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to
TimelineCollector. Contributed by Sangjin Lee
Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
new file mode 100644
index 0000000..009fa63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+
+public class NMCollectorService extends CompositeService implements
+ CollectorNodemanagerProtocol {
+
+ private static final Log LOG = LogFactory.getLog(NMCollectorService.class);
+
+ final Context context;
+
+ private Server server;
+
+ public NMCollectorService(Context context) {
+
+ super(NMCollectorService.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+
+ InetSocketAddress collectorServerAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+
+ Configuration serverConf = new Configuration(conf);
+
+ // TODO Security settings.
+ YarnRPC rpc = YarnRPC.create(conf);
+
+ server =
+ rpc.getServer(CollectorNodemanagerProtocol.class, this,
+ collectorServerAddress, serverConf,
+ this.context.getNMTokenSecretManager(),
+ conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT));
+
+ server.start();
+ // start remaining services
+ super.serviceStart();
+ LOG.info("NMCollectorService started at " + collectorServerAddress);
+ }
+
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ // TODO may cleanup app collectors running on this NM in future.
+ super.serviceStop();
+ }
+
+ @Override
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request) throws IOException {
+ List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
+ if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
+ Map<ApplicationId, String> newCollectorsMap =
+ new HashMap<ApplicationId, String>();
+ for (AppCollectorsMap collector : newCollectorsList) {
+ newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr());
+ }
+ ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
+ }
+
+ return ReportNewCollectorInfoResponse.newInstance();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6bf3bbf..5f84b4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -425,10 +425,11 @@ public class ApplicationImpl implements Application {
new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId());
- // Remove aggregator info for finished apps.
- // TODO check we remove related aggregators info in failure cases (YARN-3038)
- app.context.getRegisteredAggregators().remove(app.getAppId());
- app.context.getKnownAggregators().remove(app.getAppId());
+ // Remove collectors info for finished apps.
+ // TODO check we remove related collectors info in failure cases
+ // (YARN-3038)
+ app.context.getRegisteredCollectors().remove(app.getAppId());
+ app.context.getKnownCollectors().remove(app.getAppId());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 2eb1a7f..b1f0472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -344,8 +344,8 @@ public class ApplicationMasterService extends AbstractService implements
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
- // Remove aggregator address when app get finished.
- rmApp.removeAggregatorAddr();
+ // Remove collector address when app get finished.
+ rmApp.removeCollectorAddr();
// checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart.
@@ -578,10 +578,10 @@ public class ApplicationMasterService extends AbstractService implements
allocateResponse.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
- // add aggregator address for this application
- allocateResponse.setAggregatorAddr(
- this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
+
+ // add collector address for this application
+ allocateResponse.setCollectorAddr(
+ this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
// add preemption to the allocateResponse message (if any)
allocateResponse
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f163a28..16aae40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -410,11 +410,11 @@ public class ResourceTrackerService extends AbstractService implements
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return resync;
}
-
- // Check & update aggregators info from request.
+
+ // Check & update collectors info from request.
// TODO make sure it won't have race condition issue for AM failed over case
// that the older registration could possible override the newer one.
- updateAppAggregatorsMap(request);
+ updateAppCollectorsMap(request);
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -430,13 +430,14 @@ public class ResourceTrackerService extends AbstractService implements
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
-
- // Return aggregators' map that NM needs to know
- // TODO we should optimize this to only include aggreator info that NM
+
+ // Return collectors' map that NM needs to know
+ // TODO we should optimize this to only include collector info that NM
// doesn't know yet.
- List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+ List<ApplicationId> keepAliveApps =
+ remoteNodeStatus.getKeepAliveApplications();
if (keepAliveApps != null) {
- setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+ setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
}
// 4. Send status to RMNode, saving the latest response.
@@ -447,48 +448,49 @@ public class ResourceTrackerService extends AbstractService implements
return nodeHeartBeatResponse;
}
-
- private void setAppAggregatorsMapToResponse(
+
+ private void setAppCollectorsMapToResponse(
List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
- Map<ApplicationId, String> liveAppAggregatorsMap = new
+ Map<ApplicationId, String> liveAppCollectorsMap = new
ConcurrentHashMap<ApplicationId, String>();
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
for (ApplicationId appId : liveApps) {
- String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
- if (appAggregatorAddr != null) {
- liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+ String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
+ if (appCollectorAddr != null) {
+ liveAppCollectorsMap.put(appId, appCollectorAddr);
} else {
- // Log a debug info if aggregator address is not found.
+ // Log a debug info if collector address is not found.
if (LOG.isDebugEnabled()) {
- LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+ LOG.debug("Collector for applicaton: " + appId +
+ " hasn't registered yet!");
}
}
}
- response.setAppAggregatorsMap(liveAppAggregatorsMap);
+ response.setAppCollectorsMap(liveAppCollectorsMap);
}
-
- private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
- Map<ApplicationId, String> registeredAggregatorsMap =
- request.getRegisteredAggregators();
- if (registeredAggregatorsMap != null
- && !registeredAggregatorsMap.isEmpty()) {
+
+ private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
+ Map<ApplicationId, String> registeredCollectorsMap =
+ request.getRegisteredCollectors();
+ if (registeredCollectorsMap != null
+ && !registeredCollectorsMap.isEmpty()) {
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
- for (Map.Entry<ApplicationId, String> entry:
- registeredAggregatorsMap.entrySet()) {
+ for (Map.Entry<ApplicationId, String> entry:
+ registeredCollectorsMap.entrySet()) {
ApplicationId appId = entry.getKey();
- String aggregatorAddr = entry.getValue();
- if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+ String collectorAddr = entry.getValue();
+ if (collectorAddr != null && !collectorAddr.isEmpty()) {
RMApp rmApp = rmApps.get(appId);
if (rmApp == null) {
- LOG.warn("Cannot update aggregator info because application ID: " +
+ LOG.warn("Cannot update collector info because application ID: " +
appId + " is not found in RMContext!");
} else {
- String previousAggregatorAddr = rmApp.getAggregatorAddr();
- if (previousAggregatorAddr == null ||
- previousAggregatorAddr != aggregatorAddr) {
- // sending aggregator update event.
- RMAppAggregatorUpdateEvent event =
- new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+ String previousCollectorAddr = rmApp.getCollectorAddr();
+ if (previousCollectorAddr == null ||
+ previousCollectorAddr != collectorAddr) {
+ // sending collector update event.
+ RMAppCollectorUpdateEvent event =
+ new RMAppCollectorUpdateEvent(appId, collectorAddr);
rmContext.getDispatcher().getEventHandler().handle(event);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index f81edb2..32cf2cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -172,23 +172,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the tracking url for the application master.
*/
String getTrackingUrl();
-
+
/**
- * The aggregator address for the application.
- * @return the address for the application's aggregator.
+ * The collector address for the application.
+ * @return the address for the application's collector.
*/
- String getAggregatorAddr();
-
+ String getCollectorAddr();
+
/**
- * Set aggregator address for the application
- * @param aggregatorAddr the address of aggregator
+ * Set collector address for the application
+ * @param collectorAddr the address of collector
*/
- void setAggregatorAddr(String aggregatorAddr);
-
+ void setCollectorAddr(String collectorAddr);
+
/**
- * Remove aggregator address when application is finished or killed.
+ * Remove collector address when application is finished or killed.
*/
- void removeAggregatorAddr();
+ void removeCollectorAddr();
/**
* The original tracking url for the application master.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
deleted file mode 100644
index b43de44..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-public class RMAppAggregatorUpdateEvent extends RMAppEvent {
-
- private final String appAggregatorAddr;
-
- public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
- super(appId, RMAppEventType.AGGREGATOR_UPDATE);
- this.appAggregatorAddr = appAggregatorAddr;
- }
-
- public String getAppAggregatorAddr(){
- return this.appAggregatorAddr;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
new file mode 100644
index 0000000..698c9b5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppCollectorUpdateEvent extends RMAppEvent {
+
+ private final String appCollectorAddr;
+
+ public RMAppCollectorUpdateEvent(ApplicationId appId,
+ String appCollectorAddr) {
+ super(appId, RMAppEventType.COLLECTOR_UPDATE);
+ this.appCollectorAddr = appCollectorAddr;
+ }
+
+ public String getAppCollectorAddr(){
+ return this.appCollectorAddr;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 6e9460a..2b42638 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -30,9 +30,9 @@ public enum RMAppEventType {
// Source: Scheduler
APP_ACCEPTED,
-
+
// TODO add source later
- AGGREGATOR_UPDATE,
+ COLLECTOR_UPDATE,
// Source: RMAppAttempt
ATTEMPT_REGISTERED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 6a076ac..61c5748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -134,7 +134,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
- private String aggregatorAddr;
+ private String collectorAddr;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@@ -167,7 +167,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -185,7 +185,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -205,7 +205,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@@ -223,7 +223,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
@@ -251,7 +251,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@@ -282,7 +282,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -295,7 +295,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -308,7 +308,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -505,20 +505,20 @@ public class RMAppImpl implements RMApp, Recoverable {
public void setQueue(String queue) {
this.queue = queue;
}
-
+
@Override
- public String getAggregatorAddr() {
- return this.aggregatorAddr;
+ public String getCollectorAddr() {
+ return this.collectorAddr;
}
-
+
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
- this.aggregatorAddr = aggregatorAddr;
+ public void setCollectorAddr(String collectorAddr) {
+ this.collectorAddr = collectorAddr;
}
-
+
@Override
- public void removeAggregatorAddr() {
- this.aggregatorAddr = null;
+ public void removeCollectorAddr() {
+ this.collectorAddr = null;
}
@Override
@@ -769,8 +769,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
- //TODO recover aggregator address.
- //this.aggregatorAddr = appState.getAggregatorAddr();
+ //TODO recover collector address.
+ //this.collectorAddr = appState.getCollectorAddr();
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
@@ -814,22 +814,22 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
- private static final class RMAppAggregatorUpdateTransition
+ private static final class RMAppCollectorUpdateTransition
extends RMAppTransition {
-
+
public void transition(RMAppImpl app, RMAppEvent event) {
- LOG.info("Updating aggregator info for app: " + app.getApplicationId());
-
- RMAppAggregatorUpdateEvent appAggregatorUpdateEvent =
- (RMAppAggregatorUpdateEvent) event;
- // Update aggregator address
- app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
-
+ LOG.info("Updating collector info for app: " + app.getApplicationId());
+
+ RMAppCollectorUpdateEvent appCollectorUpdateEvent =
+ (RMAppCollectorUpdateEvent) event;
+ // Update collector address
+ app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
+
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
};
}
-
+
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 0d0895a..2e1a27e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -92,15 +92,15 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public String getAggregatorAddr() {
+ public String getCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void removeAggregatorAddr() {
+ public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 96952d2..58dcacf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -273,17 +273,17 @@ public class MockRMApp implements RMApp {
}
@Override
- public String getAggregatorAddr() {
+ public String getCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
-
+
@Override
- public void removeAggregatorAddr() {
+ public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index ae5efa5..1fef76e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -56,6 +56,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 32ee5d8..fab131c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -1,25 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.yarn.server.timelineservice;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.fail;
-
public class TestTimelineServiceClientIntegration {
- private static PerNodeTimelineAggregatorsAuxService auxService;
+ private static TimelineCollectorManager collectorManager;
+ private static PerNodeTimelineCollectorsAuxService auxService;
@BeforeClass
public static void setupClass() throws Exception {
try {
- auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+ collectorManager = new MyTimelineCollectorManager();
+ auxService =
+ PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
+ collectorManager);
auxService.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) {
fail();
@@ -38,6 +63,9 @@ public class TestTimelineServiceClientIntegration {
TimelineClient client =
TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
+ // set the timeline service address manually
+ client.setTimelineServiceAddress(
+ collectorManager.getRestServerBindAddress());
client.init(new YarnConfiguration());
client.start();
TimelineEntity entity = new TimelineEntity();
@@ -45,10 +73,20 @@ public class TestTimelineServiceClientIntegration {
entity.setId("test entity id");
client.putEntities(entity);
client.putEntitiesAsync(entity);
- } catch(Exception e) {
- fail();
} finally {
client.stop();
}
}
+
+ private static class MyTimelineCollectorManager extends
+ TimelineCollectorManager {
+ public MyTimelineCollectorManager() {
+ super();
+ }
+
+ @Override
+ protected CollectorNodemanagerProtocol getNMCollectorService() {
+ return mock(CollectorNodemanagerProtocol.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 26790f1..f974aee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -58,6 +58,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
</dependency>
@@ -72,6 +77,11 @@
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
deleted file mode 100644
index 95ec9f8..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage for a given YARN application.
- *
- * App-related lifecycle management is handled by this service.
- */
-@Private
-@Unstable
-public class AppLevelTimelineAggregator extends TimelineAggregator {
- private final String applicationId;
- // TODO define key metadata such as flow metadata, user, and queue
-
- public AppLevelTimelineAggregator(String applicationId) {
- super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
- this.applicationId = applicationId;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
deleted file mode 100644
index 19920fd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.nio.ByteBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ContainerContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * The top-level server for the per-node timeline aggregator collection. Currently
- * it is defined as an auxiliary service to accommodate running within another
- * daemon (e.g. node manager).
- */
-@Private
-@Unstable
-public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
- private static final Log LOG =
- LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
- private static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
- private final TimelineAggregatorsCollection aggregatorCollection;
-
- public PerNodeTimelineAggregatorsAuxService() {
- // use the same singleton
- this(TimelineAggregatorsCollection.getInstance());
- }
-
- @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
- TimelineAggregatorsCollection aggregatorCollection) {
- super("timeline_aggregator");
- this.aggregatorCollection = aggregatorCollection;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- aggregatorCollection.init(conf);
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- aggregatorCollection.start();
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- aggregatorCollection.stop();
- super.serviceStop();
- }
-
- // these methods can be used as the basis for future service methods if the
- // per-node aggregator runs separate from the node manager
- /**
- * Creates and adds an app level aggregator for the specified application id.
- * The aggregator is also initialized and started. If the service already
- * exists, no new service is created.
- *
- * @return whether it was added successfully
- */
- public boolean addApplication(ApplicationId appId) {
- AppLevelTimelineAggregator aggregator =
- new AppLevelTimelineAggregator(appId.toString());
- return (aggregatorCollection.putIfAbsent(appId, aggregator)
- == aggregator);
- }
-
- /**
- * Removes the app level aggregator for the specified application id. The
- * aggregator is also stopped as a result. If the aggregator does not exist, no
- * change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean removeApplication(ApplicationId appId) {
- String appIdString = appId.toString();
- return aggregatorCollection.remove(appIdString);
- }
-
- /**
- * Creates and adds an app level aggregator for the specified application id.
- * The aggregator is also initialized and started. If the aggregator already
- * exists, no new aggregator is created.
- */
- @Override
- public void initializeContainer(ContainerInitializationContext context) {
- // intercept the event of the AM container being created and initialize the
- // app level aggregator service
- if (isApplicationMaster(context)) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- addApplication(appId);
- }
- }
-
- /**
- * Removes the app level aggregator for the specified application id. The
- * aggregator is also stopped as a result. If the aggregator does not exist, no
- * change is made.
- */
- @Override
- public void stopContainer(ContainerTerminationContext context) {
- // intercept the event of the AM container being stopped and remove the app
- // level aggregator service
- if (isApplicationMaster(context)) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- removeApplication(appId);
- }
- }
-
- private boolean isApplicationMaster(ContainerContext context) {
- // TODO this is based on a (shaky) assumption that the container id (the
- // last field of the full container id) for an AM is always 1
- // we want to make this much more reliable
- ContainerId containerId = context.getContainerId();
- return containerId.getContainerId() == 1L;
- }
-
- @VisibleForTesting
- boolean hasApplication(String appId) {
- return aggregatorCollection.containsKey(appId);
- }
-
- @Override
- public void initializeApplication(ApplicationInitializationContext context) {
- }
-
- @Override
- public void stopApplication(ApplicationTerminationContext context) {
- }
-
- @Override
- public ByteBuffer getMetaData() {
- // TODO currently it is not used; we can return a more meaningful data when
- // we connect it with an AM
- return ByteBuffer.allocate(0);
- }
-
- @VisibleForTesting
- public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) {
- Thread
- .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args,
- LOG);
- PerNodeTimelineAggregatorsAuxService auxService = null;
- try {
- auxService = new PerNodeTimelineAggregatorsAuxService();
- ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
- SHUTDOWN_HOOK_PRIORITY);
- YarnConfiguration conf = new YarnConfiguration();
- auxService.init(conf);
- auxService.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting PerNodeAggregatorServer", t);
- ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
- }
- return auxService;
- }
-
- private static class ShutdownHook implements Runnable {
- private final PerNodeTimelineAggregatorsAuxService auxService;
-
- public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
- this.auxService = auxService;
- }
-
- public void run() {
- auxService.stop();
- }
- }
-
- public static void main(String[] args) {
- launchServer(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
deleted file mode 100644
index dbd0895..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
-import org.apache.hadoop.util.ReflectionUtils;
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage.
- *
- * Classes that extend this can add their own lifecycle management or
- * customization of request handling.
- */
-@Private
-@Unstable
-public abstract class TimelineAggregator extends CompositeService {
- private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
-
- private TimelineWriter writer;
-
- public TimelineAggregator(String name) {
- super(name);
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- writer = ReflectionUtils.newInstance(conf.getClass(
- YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
- FileSystemTimelineWriterImpl.class,
- TimelineWriter.class), conf);
- writer.init(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- writer.stop();
- }
-
- public TimelineWriter getWriter() {
- return writer;
- }
-
- /**
- * Handles entity writes. These writes are synchronous and are written to the
- * backing storage without buffering/batching. If any entity already exists,
- * it results in an update of the entity.
- *
- * This method should be reserved for selected critical entities and events.
- * For normal voluminous writes one should use the async method
- * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
- *
- * @param entities entities to post
- * @param callerUgi the caller UGI
- * @return the response that contains the result of the post.
- */
- public TimelineWriteResponse postEntities(TimelineEntities entities,
- UserGroupInformation callerUgi) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
- LOG.debug("postEntities(entities=" + entities + ", callerUgi="
- + callerUgi + ")");
- }
-
- return writer.write(entities);
- }
-
- /**
- * Handles entity writes in an asynchronous manner. The method returns as soon
- * as validation is done. No promises are made on how quickly it will be
- * written to the backing storage or if it will always be written to the
- * backing storage. Multiple writes to the same entities may be batched and
- * appropriate values updated and result in fewer writes to the backing
- * storage.
- *
- * @param entities entities to post
- * @param callerUgi the caller UGI
- */
- public void postEntitiesAsync(TimelineEntities entities,
- UserGroupInformation callerUgi) {
- // TODO implement
- if (LOG.isDebugEnabled()) {
- LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
deleted file mode 100644
index 7d42f94..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.webapp.ForbiddenException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-
-import com.google.inject.Singleton;
-
-/**
- * The main per-node REST end point for timeline service writes. It is
- * essentially a container service that routes requests to the appropriate
- * per-app services.
- */
-@Private
-@Unstable
-@Singleton
-@Path("/ws/v2/timeline")
-public class TimelineAggregatorWebService {
- private static final Log LOG =
- LogFactory.getLog(TimelineAggregatorWebService.class);
-
- private @Context ServletContext context;
-
- @XmlRootElement(name = "about")
- @XmlAccessorType(XmlAccessType.NONE)
- @Public
- @Unstable
- public static class AboutInfo {
-
- private String about;
-
- public AboutInfo() {
-
- }
-
- public AboutInfo(String about) {
- this.about = about;
- }
-
- @XmlElement(name = "About")
- public String getAbout() {
- return about;
- }
-
- public void setAbout(String about) {
- this.about = about;
- }
-
- }
-
- /**
- * Return the description of the timeline web services.
- */
- @GET
- @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public AboutInfo about(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res) {
- init(res);
- return new AboutInfo("Timeline API");
- }
-
- /**
- * Accepts writes to the aggregator, and returns a response. It simply routes
- * the request to the app level aggregator. It expects an application as a
- * context.
- */
- @PUT
- @Path("/entities")
- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public Response putEntities(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("async") String async,
- @QueryParam("appid") String appId,
- TimelineEntities entities) {
- init(res);
- UserGroupInformation callerUgi = getUser(req);
- if (callerUgi == null) {
- String msg = "The owner of the posted timeline entities is not set";
- LOG.error(msg);
- throw new ForbiddenException(msg);
- }
-
- // TODO how to express async posts and handle them
- boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
- try {
- appId = parseApplicationId(appId);
- if (appId == null) {
- return Response.status(Response.Status.BAD_REQUEST).build();
- }
- TimelineAggregator service = getAggregatorService(req, appId);
- if (service == null) {
- LOG.error("Application not found");
- throw new NotFoundException(); // different exception?
- }
- service.postEntities(entities, callerUgi);
- return Response.ok().build();
- } catch (Exception e) {
- LOG.error("Error putting entities", e);
- throw new WebApplicationException(e,
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- private String parseApplicationId(String appId) {
- // Make sure the appId is not null and is valid
- ApplicationId appID;
- try {
- if (appId != null) {
- return ConverterUtils.toApplicationId(appId.trim()).toString();
- } else {
- return null;
- }
- } catch (Exception e) {
- return null;
- }
- }
-
- private TimelineAggregator
- getAggregatorService(HttpServletRequest req, String appIdToParse) {
- String appIdString = parseApplicationId(appIdToParse);
- final TimelineAggregatorsCollection aggregatorCollection =
- (TimelineAggregatorsCollection) context.getAttribute(
- TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
- return aggregatorCollection.get(appIdString);
- }
-
- private void init(HttpServletResponse response) {
- response.setContentType(null);
- }
-
- private UserGroupInformation getUser(HttpServletRequest req) {
- String remoteUser = req.getRemoteUser();
- UserGroupInformation callerUgi = null;
- if (remoteUser != null) {
- callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
- }
- return callerUgi;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
deleted file mode 100644
index d6e2a18..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-/**
- * Class that manages adding and removing aggregators and their lifecycle. It
- * provides thread safety access to the aggregators inside.
- *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
- */
-@Private
-@Unstable
-public class TimelineAggregatorsCollection extends CompositeService {
- private static final Log LOG =
- LogFactory.getLog(TimelineAggregatorsCollection.class);
- private static final TimelineAggregatorsCollection INSTANCE =
- new TimelineAggregatorsCollection();
-
- // access to this map is synchronized with the map itself
- private final Map<String, TimelineAggregator> aggregators =
- Collections.synchronizedMap(
- new HashMap<String, TimelineAggregator>());
-
- // REST server for this aggregator collection
- private HttpServer2 timelineRestServer;
-
- private String timelineRestServerBindAddress;
-
- private AggregatorNodemanagerProtocol nmAggregatorService;
-
- private InetSocketAddress nmAggregatorServiceAddress;
-
- static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
-
- static TimelineAggregatorsCollection getInstance() {
- return INSTANCE;
- }
-
- TimelineAggregatorsCollection() {
- super(TimelineAggregatorsCollection.class.getName());
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- this.nmAggregatorServiceAddress = conf.getSocketAddr(
- YarnConfiguration.NM_BIND_HOST,
- YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
-
- }
-
- @Override
- protected void serviceStart() throws Exception {
- startWebApp();
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- if (timelineRestServer != null) {
- timelineRestServer.stop();
- }
- super.serviceStop();
- }
-
- /**
- * Put the aggregator into the collection if an aggregator mapped by id does
- * not exist.
- *
- * @throws YarnRuntimeException if there was any exception in initializing and
- * starting the app level service
- * @return the aggregator associated with id after the potential put.
- */
- public TimelineAggregator putIfAbsent(ApplicationId appId,
- TimelineAggregator aggregator) {
- String id = appId.toString();
- TimelineAggregator aggregatorInTable;
- boolean aggregatorIsNew = false;
- synchronized (aggregators) {
- aggregatorInTable = aggregators.get(id);
- if (aggregatorInTable == null) {
- try {
- // initialize, start, and add it to the collection so it can be
- // cleaned up when the parent shuts down
- aggregator.init(getConfig());
- aggregator.start();
- aggregators.put(id, aggregator);
- LOG.info("the aggregator for " + id + " was added");
- aggregatorInTable = aggregator;
- aggregatorIsNew = true;
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
- }
- } else {
- String msg = "the aggregator for " + id + " already exists!";
- LOG.error(msg);
- }
-
- }
- // Report to NM if a new aggregator is added.
- if (aggregatorIsNew) {
- try {
- reportNewAggregatorToNM(appId);
- } catch (Exception e) {
- // throw exception here as it cannot be used if failed report to NM
- LOG.error("Failed to report a new aggregator for application: " + appId +
- " to NM Aggregator Services.");
- throw new YarnRuntimeException(e);
- }
- }
-
- return aggregatorInTable;
- }
-
- /**
- * Removes the aggregator for the specified id. The aggregator is also stopped
- * as a result. If the aggregator does not exist, no change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean remove(String id) {
- synchronized (aggregators) {
- TimelineAggregator aggregator = aggregators.remove(id);
- if (aggregator == null) {
- String msg = "the aggregator for " + id + " does not exist!";
- LOG.error(msg);
- return false;
- } else {
- // stop the service to do clean up
- aggregator.stop();
- LOG.info("the aggregator service for " + id + " was removed");
- return true;
- }
- }
- }
-
- /**
- * Returns the aggregator for the specified id.
- *
- * @return the aggregator or null if it does not exist
- */
- public TimelineAggregator get(String id) {
- return aggregators.get(id);
- }
-
- /**
- * Returns whether the aggregator for the specified id exists in this
- * collection.
- */
- public boolean containsKey(String id) {
- return aggregators.containsKey(id);
- }
-
- /**
- * Launch the REST web server for this aggregator collection
- */
- private void startWebApp() {
- Configuration conf = getConfig();
- // use the same ports as the old ATS for now; we could create new properties
- // for the new timeline service if needed
- String bindAddress = WebAppUtils.getWebAppBindURL(conf,
- YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
- WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
- this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
- NetUtils.createSocketAddr(bindAddress));
- LOG.info("Instantiating the per-node aggregator webapp at " +
- timelineRestServerBindAddress);
- try {
- Configuration confForInfoServer = new Configuration(conf);
- confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
- HttpServer2.Builder builder = new HttpServer2.Builder()
- .setName("timeline")
- .setConf(conf)
- .addEndpoint(URI.create("http://" + bindAddress));
- timelineRestServer = builder.build();
- // TODO: replace this by an authentication filter in future.
- HashMap<String, String> options = new HashMap<>();
- String username = conf.get(HADOOP_HTTP_STATIC_USER,
- DEFAULT_HADOOP_HTTP_STATIC_USER);
- options.put(HADOOP_HTTP_STATIC_USER, username);
- HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
- "static_user_filter_timeline",
- StaticUserWebFilter.StaticUserFilter.class.getName(),
- options, new String[] {"/*"});
-
- timelineRestServer.addJerseyResourcePackage(
- TimelineAggregatorWebService.class.getPackage().getName() + ";"
- + GenericExceptionHandler.class.getPackage().getName() + ";"
- + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
- "/*");
- timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
- TimelineAggregatorsCollection.getInstance());
- timelineRestServer.start();
- } catch (Exception e) {
- String msg = "The per-node aggregator webapp failed to start.";
- LOG.error(msg, e);
- throw new YarnRuntimeException(msg, e);
- }
- }
-
- private void reportNewAggregatorToNM(ApplicationId appId)
- throws YarnException, IOException {
- this.nmAggregatorService = getNMAggregatorService();
- ReportNewAggregatorsInfoRequest request =
- ReportNewAggregatorsInfoRequest.newInstance(appId,
- this.timelineRestServerBindAddress);
- LOG.info("Report a new aggregator for application: " + appId +
- " to NM Aggregator Services.");
- nmAggregatorService.reportNewAggregatorInfo(request);
- }
-
- // protected for test
- protected AggregatorNodemanagerProtocol getNMAggregatorService(){
- Configuration conf = getConfig();
- final YarnRPC rpc = YarnRPC.create(conf);
-
- // TODO Security settings.
- return (AggregatorNodemanagerProtocol) rpc.getProxy(
- AggregatorNodemanagerProtocol.class,
- nmAggregatorServiceAddress, conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
new file mode 100644
index 0000000..7d59876
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineCollector extends TimelineCollector {
+ private final String applicationId;
+ // TODO define key metadata such as flow metadata, user, and queue
+
+ public AppLevelTimelineCollector(String applicationId) {
+ super(AppLevelTimelineCollector.class.getName() + " - " + applicationId);
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+}
[4/4] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to
TimelineCollector. Contributed by Sangjin Lee
Posted by ju...@apache.org.
YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. Contributed by Sangjin Lee
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dda84085
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dda84085
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dda84085
Branch: refs/heads/YARN-2928
Commit: dda84085cabd8fdf143b380e54e1730802fd9912
Parents: 8a63791
Author: Junping Du <ju...@apache.org>
Authored: Thu Mar 19 11:49:07 2015 -0700
Committer: Junping Du <ju...@apache.org>
Committed: Thu Mar 19 11:49:07 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 5 +-
.../hadoop-yarn/hadoop-yarn-api/pom.xml | 4 +
.../api/protocolrecords/AllocateResponse.java | 20 +-
.../timelineservice/TimelineWriteResponse.java | 20 +-
.../hadoop/yarn/conf/YarnConfiguration.java | 20 +-
.../src/main/proto/yarn_service_protos.proto | 2 +-
.../pom.xml | 10 +
.../distributedshell/ApplicationMaster.java | 62 ++---
.../applications/distributedshell/Client.java | 8 +-
.../distributedshell/TestDistributedShell.java | 10 +-
.../hadoop/yarn/client/api/AMRMClient.java | 6 +-
.../yarn/client/api/async/AMRMClientAsync.java | 4 +-
.../api/async/impl/AMRMClientAsyncImpl.java | 20 +-
.../impl/pb/AllocateResponsePBImpl.java | 16 +-
.../hadoop/yarn/client/api/TimelineClient.java | 2 +-
.../client/api/impl/TimelineClientImpl.java | 32 +--
.../src/main/resources/yarn-default.xml | 14 +-
.../hadoop/yarn/TestContainerLaunchRPC.java | 2 +-
.../hadoop/yarn/api/TestAllocateResponse.java | 12 +-
.../hadoop-yarn-server-common/pom.xml | 2 +-
.../api/AggregatorNodemanagerProtocol.java | 56 ----
.../api/AggregatorNodemanagerProtocolPB.java | 33 ---
.../api/CollectorNodemanagerProtocol.java | 57 ++++
.../api/CollectorNodemanagerProtocolPB.java | 33 +++
...gregatorNodemanagerProtocolPBClientImpl.java | 94 -------
...ollectorNodemanagerProtocolPBClientImpl.java | 94 +++++++
...regatorNodemanagerProtocolPBServiceImpl.java | 61 ----
...llectorNodemanagerProtocolPBServiceImpl.java | 59 ++++
.../protocolrecords/NodeHeartbeatRequest.java | 16 +-
.../protocolrecords/NodeHeartbeatResponse.java | 8 +-
.../ReportNewAggregatorsInfoRequest.java | 53 ----
.../ReportNewAggregatorsInfoResponse.java | 32 ---
.../ReportNewCollectorInfoRequest.java | 53 ++++
.../ReportNewCollectorInfoResponse.java | 32 +++
.../impl/pb/NodeHeartbeatRequestPBImpl.java | 62 ++---
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 60 ++--
.../ReportNewAggregatorsInfoRequestPBImpl.java | 142 ----------
.../ReportNewAggregatorsInfoResponsePBImpl.java | 74 -----
.../pb/ReportNewCollectorInfoRequestPBImpl.java | 142 ++++++++++
.../ReportNewCollectorInfoResponsePBImpl.java | 74 +++++
.../server/api/records/AppAggregatorsMap.java | 33 ---
.../server/api/records/AppCollectorsMap.java | 46 +++
.../impl/pb/AppAggregatorsMapPBImpl.java | 151 ----------
.../records/impl/pb/AppCollectorsMapPBImpl.java | 151 ++++++++++
.../proto/aggregatornodemanager_protocol.proto | 29 --
.../proto/collectornodemanager_protocol.proto | 29 ++
.../yarn_server_common_service_protos.proto | 18 +-
.../java/org/apache/hadoop/yarn/TestRPC.java | 116 ++++----
.../hadoop/yarn/TestYarnServerApiClasses.java | 24 +-
.../hadoop/yarn/server/nodemanager/Context.java | 14 +-
.../yarn/server/nodemanager/NodeManager.java | 60 ++--
.../nodemanager/NodeStatusUpdaterImpl.java | 11 +-
.../aggregatormanager/NMAggregatorService.java | 113 --------
.../collectormanager/NMCollectorService.java | 110 ++++++++
.../application/ApplicationImpl.java | 9 +-
.../ApplicationMasterService.java | 12 +-
.../resourcemanager/ResourceTrackerService.java | 72 ++---
.../server/resourcemanager/rmapp/RMApp.java | 22 +-
.../rmapp/RMAppAggregatorUpdateEvent.java | 36 ---
.../rmapp/RMAppCollectorUpdateEvent.java | 37 +++
.../resourcemanager/rmapp/RMAppEventType.java | 4 +-
.../server/resourcemanager/rmapp/RMAppImpl.java | 60 ++--
.../applicationsmanager/MockAsm.java | 6 +-
.../server/resourcemanager/rmapp/MockRMApp.java | 8 +-
.../hadoop-yarn-server-tests/pom.xml | 5 +
.../TestTimelineServiceClientIntegration.java | 52 +++-
.../hadoop-yarn-server-timelineservice/pom.xml | 10 +
.../aggregator/AppLevelTimelineAggregator.java | 57 ----
.../PerNodeTimelineAggregatorsAuxService.java | 211 --------------
.../aggregator/TimelineAggregator.java | 122 --------
.../TimelineAggregatorWebService.java | 180 ------------
.../TimelineAggregatorsCollection.java | 271 ------------------
.../collector/AppLevelTimelineCollector.java | 57 ++++
.../PerNodeTimelineCollectorsAuxService.java | 214 ++++++++++++++
.../collector/TimelineCollector.java | 122 ++++++++
.../collector/TimelineCollectorManager.java | 278 +++++++++++++++++++
.../collector/TimelineCollectorWebService.java | 183 ++++++++++++
.../storage/FileSystemTimelineWriterImpl.java | 6 +-
.../TestAppLevelTimelineAggregator.java | 23 --
...estPerNodeTimelineAggregatorsAuxService.java | 150 ----------
.../TestTimelineAggregatorsCollection.java | 109 --------
.../TestAppLevelTimelineCollector.java | 23 ++
...TestPerNodeTimelineCollectorsAuxService.java | 164 +++++++++++
.../collector/TestTimelineCollectorManager.java | 118 ++++++++
.../TestFileSystemTimelineWriterImpl.java | 43 ++-
85 files changed, 2597 insertions(+), 2478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 47351c6..216ae77 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -32,13 +32,16 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
YARN-3039. Implemented the app-level timeline aggregator discovery service.
(Junping Du via zjshen)
+ YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. (Sangjin Lee
+ via junping_du)
+
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
-Trunk - Unreleased
+Trunk - Unreleased
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
index a763d39..edb42a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
@@ -45,6 +45,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 421c2a0..6703249 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -120,7 +120,7 @@ public abstract class AllocateResponse {
response.setAMRMToken(amRMToken);
return response;
}
-
+
@Public
@Unstable
public static AllocateResponse newInstance(int responseId,
@@ -130,13 +130,13 @@ public abstract class AllocateResponse {
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
List<ContainerResourceIncrease> increasedContainers,
List<ContainerResourceDecrease> decreasedContainers,
- String aggregatorAddr) {
+ String collectorAddr) {
AllocateResponse response =
newInstance(responseId, completedContainers, allocatedContainers,
updatedNodes, availResources, command, numClusterNodes, preempt,
nmTokens, increasedContainers, decreasedContainers);
response.setAMRMToken(amRMToken);
- response.setAggregatorAddr(aggregatorAddr);
+ response.setCollectorAddr(collectorAddr);
return response;
}
@@ -323,18 +323,18 @@ public abstract class AllocateResponse {
@Private
@Unstable
public abstract void setAMRMToken(Token amRMToken);
-
+
/**
- * The address of aggregator that belong to this app
+ * The address of collector that belong to this app
*
- * @return The address of aggregator that belong to this attempt
+ * @return The address of collector that belong to this attempt
*/
@Public
@Unstable
- public abstract String getAggregatorAddr();
-
+ public abstract String getCollectorAddr();
+
@Private
@Unstable
- public abstract void setAggregatorAddr(String aggregatorAddr);
-
+ public abstract void setCollectorAddr(String collectorAddr);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
index 82ecdbd..4739d8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
@@ -47,7 +47,7 @@ public class TimelineWriteResponse {
/**
* Get a list of {@link TimelineWriteError} instances
- *
+ *
* @return a list of {@link TimelineWriteError} instances
*/
@XmlElement(name = "errors")
@@ -57,7 +57,7 @@ public class TimelineWriteResponse {
/**
* Add a single {@link TimelineWriteError} instance into the existing list
- *
+ *
* @param error
* a single {@link TimelineWriteError} instance
*/
@@ -67,7 +67,7 @@ public class TimelineWriteResponse {
/**
* Add a list of {@link TimelineWriteError} instances into the existing list
- *
+ *
* @param errors
* a list of {@link TimelineWriteError} instances
*/
@@ -77,7 +77,7 @@ public class TimelineWriteResponse {
/**
* Set the list to the given list of {@link TimelineWriteError} instances
- *
+ *
* @param errors
* a list of {@link TimelineWriteError} instances
*/
@@ -107,7 +107,7 @@ public class TimelineWriteResponse {
/**
* Get the entity Id
- *
+ *
* @return the entity Id
*/
@XmlElement(name = "entity")
@@ -117,7 +117,7 @@ public class TimelineWriteResponse {
/**
* Set the entity Id
- *
+ *
* @param entityId
* the entity Id
*/
@@ -127,7 +127,7 @@ public class TimelineWriteResponse {
/**
* Get the entity type
- *
+ *
* @return the entity type
*/
@XmlElement(name = "entitytype")
@@ -137,7 +137,7 @@ public class TimelineWriteResponse {
/**
* Set the entity type
- *
+ *
* @param entityType
* the entity type
*/
@@ -147,7 +147,7 @@ public class TimelineWriteResponse {
/**
* Get the error code
- *
+ *
* @return an error code
*/
@XmlElement(name = "errorcode")
@@ -157,7 +157,7 @@ public class TimelineWriteResponse {
/**
* Set the error code to the given error code
- *
+ *
* @param errorCode
* an error code
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a987044..73f11b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -644,10 +644,10 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
/** Number of threads container manager uses.*/
- public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT =
- NM_PREFIX + "aggregator-service.thread-count";
- public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5;
-
+ public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT =
+ NM_PREFIX + "collector-service.thread-count";
+ public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5;
+
/** Number of threads used in cleanup.*/
public static final String NM_DELETE_THREAD_COUNT =
NM_PREFIX + "delete.thread-count";
@@ -675,13 +675,13 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
DEFAULT_NM_LOCALIZER_PORT;
- /** Address where the aggregator service IPC is.*/
- public static final String NM_AGGREGATOR_SERVICE_ADDRESS =
- NM_PREFIX + "aggregator-service.address";
- public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048;
- public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS =
+ /** Address where the collector service IPC is.*/
+ public static final String NM_COLLECTOR_SERVICE_ADDRESS =
+ NM_PREFIX + "collector-service.address";
+ public static final int DEFAULT_NM_COLLECTOR_SERVICE_PORT = 8048;
+ public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
"0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT;
-
+
/** Interval in between cache cleanups.*/
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
NM_PREFIX + "localizer.cache.cleanup.interval-ms";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 4ae4806..95aca85 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -87,7 +87,7 @@ message AllocateResponseProto {
repeated ContainerResourceIncreaseProto increased_containers = 10;
repeated ContainerResourceDecreaseProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12;
- optional string aggregator_addr = 13;
+ optional string collector_addr = 13;
}
enum SchedulerResourceTypes {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index d91c67b..a0087dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -70,6 +70,16 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 3a19ac2..fe35a8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -211,12 +211,12 @@ public class ApplicationMaster {
private int appMasterRpcPort = -1;
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
-
+
private boolean newTimelineService = false;
-
+
// For posting entities in new timeline service in a non-blocking way
// TODO replace with event loop in TimelineClient.
- private static ExecutorService threadPool =
+ private static ExecutorService threadPool =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
.build());
@@ -302,9 +302,9 @@ public class ApplicationMaster {
}
appMaster.run();
result = appMaster.finish();
-
+
threadPool.shutdown();
-
+
while (!threadPool.isTerminated()) { // wait for all posting thread to finish
try {
if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
@@ -386,7 +386,7 @@ public class ApplicationMaster {
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("debug", false, "Dump out debug information");
- opts.addOption("timeline_service_version", true,
+ opts.addOption("timeline_service_version", true,
"Version for timeline service");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
@@ -527,7 +527,7 @@ public class ApplicationMaster {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
if (cliParser.hasOption("timeline_service_version")) {
- String timelineServiceVersion =
+ String timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1");
if (timelineServiceVersion.trim().equalsIgnoreCase("v1")) {
newTimelineService = false;
@@ -605,8 +605,8 @@ public class ApplicationMaster {
if(timelineClient != null) {
if (newTimelineService) {
- publishApplicationAttemptEventOnNewTimelineService(timelineClient,
- appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
+ publishApplicationAttemptEventOnNewTimelineService(timelineClient,
+ appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
appSubmitterUgi);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
@@ -620,7 +620,7 @@ public class ApplicationMaster {
// need to bind timelineClient before start.
amRMClient.registerTimelineClient(timelineClient);
amRMClient.start();
-
+
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
@@ -683,8 +683,8 @@ public class ApplicationMaster {
if(timelineClient != null) {
if (newTimelineService) {
- publishApplicationAttemptEventOnNewTimelineService(timelineClient,
- appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
+ publishApplicationAttemptEventOnNewTimelineService(timelineClient,
+ appAttemptID.toString(), DSEvent.DS_APP_ATTEMPT_START, domainId,
appSubmitterUgi);
} else {
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
@@ -757,7 +757,7 @@ public class ApplicationMaster {
if(timelineClient != null) {
timelineClient.stop();
}
-
+
return success;
}
@@ -1220,13 +1220,13 @@ public class ApplicationMaster {
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
-
+
private static void publishContainerStartEventOnNewTimelineService(
- final TimelineClient timelineClient, final Container container,
+ final TimelineClient timelineClient, final Container container,
final String domainId, final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() {
public void run() {
- publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
+ publishContainerStartEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi);
}
};
@@ -1236,14 +1236,14 @@ public class ApplicationMaster {
private static void publishContainerStartEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, Container container, String domainId,
UserGroupInformation ugi) {
- final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
-
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
+
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_START.toString());
@@ -1265,29 +1265,29 @@ public class ApplicationMaster {
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
-
+
private static void publishContainerEndEventOnNewTimelineService(
final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) {
Runnable publishWrapper = new Runnable() {
public void run() {
- publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
+ publishContainerEndEventOnNewTimelineServiceBase(timelineClient,
container, domainId, ugi);
}
};
threadPool.execute(publishWrapper);
}
-
+
private static void publishContainerEndEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, final ContainerStatus container,
final String domainId, final UserGroupInformation ugi) {
- final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(container.getContainerId().toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_END.toString());
@@ -1312,28 +1312,28 @@ public class ApplicationMaster {
private static void publishApplicationAttemptEventOnNewTimelineService(
final TimelineClient timelineClient, final String appAttemptId,
- final DSEvent appEvent, final String domainId,
+ final DSEvent appEvent, final String domainId,
final UserGroupInformation ugi) {
-
+
Runnable publishWrapper = new Runnable() {
public void run() {
- publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
+ publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient,
appAttemptId, appEvent, domainId, ugi);
}
};
threadPool.execute(publishWrapper);
}
-
+
private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
final TimelineClient timelineClient, String appAttemptId,
DSEvent appEvent, String domainId, UserGroupInformation ugi) {
- final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
+ final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(appAttemptId);
entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
//entity.setDomainId(domainId);
entity.addInfo("user", ugi.getShortUserName());
- org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
event.setId(appEvent.toString());
event.setTimestamp(System.currentTimeMillis());
@@ -1355,5 +1355,5 @@ public class ApplicationMaster {
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 02a627e..934515e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -185,7 +185,7 @@ public class Client {
// Command line options
private Options opts;
-
+
private String timelineServiceVersion;
private static final String shellCommandPath = "shellCommands";
@@ -357,11 +357,11 @@ public class Client {
throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
+ " Specified virtual cores=" + amVCores);
}
-
+
if (cliParser.hasOption("timeline_service_version")) {
- timelineServiceVersion =
+ timelineServiceVersion =
cliParser.getOptionValue("timeline_service_version", "v1");
- if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
+ if (! (timelineServiceVersion.trim().equalsIgnoreCase("v1") ||
timelineServiceVersion.trim().equalsIgnoreCase("v2"))) {
throw new IllegalArgumentException(
"timeline_service_version is not set properly, should be 'v1' or 'v2'");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 97d9168..0af050c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.junit.After;
import org.junit.Assert;
@@ -67,7 +67,7 @@ public class TestDistributedShell {
protected MiniYARNCluster yarnCluster = null;
protected YarnConfiguration conf = null;
private static final int NUM_NMS = 1;
- private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_aggregator";
+ private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
@@ -91,14 +91,14 @@ public class TestDistributedShell {
// mark if we need to launch the v1 timeline server
boolean enableATSV1 = false;
if (!currTestName.getMethodName().toLowerCase().contains("v2")) {
- // disable aux-service based timeline aggregators
+ // disable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
enableATSV1 = true;
} else {
- // enable aux-service based timeline aggregators
+ // enable aux-service based timeline collectors
conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + TIMELINE_AUX_SERVICE_NAME
- + ".class", PerNodeTimelineAggregatorsAuxService.class.getName());
+ + ".class", PerNodeTimelineCollectorsAuxService.class.getName());
}
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 56f2b10..afb2e09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -47,7 +47,7 @@ import com.google.common.collect.ImmutableList;
public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
AbstractService {
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
-
+
private TimelineClient timelineClient;
/**
@@ -382,7 +382,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
public void registerTimelineClient(TimelineClient timelineClient) {
this.timelineClient = timelineClient;
}
-
+
/**
* Get registered timeline client.
* @return
@@ -390,7 +390,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
public TimelineClient getRegisteredTimeineClient() {
return this.timelineClient;
}
-
+
/**
* Wait for <code>check</code> to return true for each 1000 ms.
* See also {@link #waitFor(com.google.common.base.Supplier, int)}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index be5610e..1a5c257 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -194,7 +194,7 @@ extends AbstractService {
* @return Current number of nodes in the cluster
*/
public abstract int getClusterNodeCount();
-
+
/**
* Register TimelineClient to AMRMClient.
* @param timelineClient
@@ -202,7 +202,7 @@ extends AbstractService {
public void registerTimelineClient(TimelineClient timelineClient) {
client.registerTimelineClient(timelineClient);
}
-
+
/**
* Get registered timeline client.
* @return
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index f0f0bc9..5351ef5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -66,8 +66,8 @@ extends AMRMClientAsync<T> {
private volatile boolean keepRunning;
private volatile float progress;
- private volatile String aggregatorAddr;
-
+ private volatile String collectorAddr;
+
private volatile Throwable savedException;
public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
@@ -307,15 +307,15 @@ extends AMRMClientAsync<T> {
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
-
- String aggregatorAddress = response.getAggregatorAddr();
+
+ String collectorAddress = response.getCollectorAddr();
TimelineClient timelineClient = client.getRegisteredTimeineClient();
- if (timelineClient != null && aggregatorAddress != null
- && !aggregatorAddress.isEmpty()) {
- if (aggregatorAddr == null ||
- !aggregatorAddr.equals(aggregatorAddress)) {
- aggregatorAddr = aggregatorAddress;
- timelineClient.setTimelineServiceAddress(aggregatorAddress);
+ if (timelineClient != null && collectorAddress != null
+ && !collectorAddress.isEmpty()) {
+ if (collectorAddr == null ||
+ !collectorAddr.equals(collectorAddress)) {
+ collectorAddr = collectorAddress;
+ timelineClient.setTimelineServiceAddress(collectorAddress);
}
}
progress = handler.getProgress();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 605c29a..8dc473c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -384,22 +384,22 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
this.amrmToken = amRMToken;
}
-
+
@Override
- public String getAggregatorAddr() {
+ public String getCollectorAddr() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
- return p.getAggregatorAddr();
+ return p.getCollectorAddr();
}
-
+
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
maybeInitBuilder();
- if (aggregatorAddr == null) {
- builder.clearAggregatorAddr();
+ if (collectorAddr == null) {
+ builder.clearCollectorAddr();
return;
}
- builder.setAggregatorAddr(aggregatorAddr);
+ builder.setCollectorAddr(collectorAddr);
}
private synchronized void initLocalIncreasedContainerList() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index 5db347e..b4c3980 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -185,5 +185,5 @@ public abstract class TimelineClient extends AbstractService {
* the timeline service address
*/
public abstract void setTimelineServiceAddress(String address);
-
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 407682d..f70cf48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -115,14 +115,14 @@ public class TimelineClientImpl extends TimelineClient {
private DelegationTokenAuthenticatedURL.Token token;
private UserGroupInformation authUgi;
private String doAsUser;
-
+
private volatile String timelineServiceAddress;
-
+
// Retry parameters for identifying new timeline service
// TODO consider to merge with connection retry
private int maxServiceRetries;
private long serviceRetryInterval;
-
+
private boolean newTimelineService = false;
@Private
@@ -322,7 +322,7 @@ public class TimelineClientImpl extends TimelineClient {
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
}
LOG.info("Timeline service address: " + getTimelineServiceAddress());
- }
+ }
super.serviceInit(conf);
}
@@ -373,16 +373,16 @@ public class TimelineClientImpl extends TimelineClient {
YarnException {
doPosting(domain, "domain");
}
-
+
// Used for new timeline service only
@Private
- public void putObjects(String path, MultivaluedMap<String, String> params,
+ public void putObjects(String path, MultivaluedMap<String, String> params,
Object obj) throws IOException, YarnException {
-
- // timelineServiceAddress could haven't be initialized yet
+
+ // timelineServiceAddress could haven't be initialized yet
// or stale (only for new timeline service)
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
-
+
// timelineServiceAddress could be stale, add retry logic here.
boolean needRetry = true;
while (needRetry) {
@@ -399,13 +399,13 @@ public class TimelineClientImpl extends TimelineClient {
}
}
}
-
+
/**
* Check if reaching to maximum of retries.
* @param retries
* @param e
*/
- private void checkRetryWithSleep(int retries, Exception e) throws
+ private void checkRetryWithSleep(int retries, Exception e) throws
YarnException, IOException {
if (retries > 0) {
try {
@@ -415,8 +415,8 @@ public class TimelineClientImpl extends TimelineClient {
}
} else {
LOG.error(
- "TimelineClient has reached to max retry times :" +
- this.maxServiceRetries + " for service address: " +
+ "TimelineClient has reached to max retry times :" +
+ this.maxServiceRetries + " for service address: " +
timelineServiceAddress);
if (e instanceof YarnException) {
throw (YarnException)e;
@@ -487,12 +487,12 @@ public class TimelineClientImpl extends TimelineClient {
}
return resp;
}
-
+
@Override
public void setTimelineServiceAddress(String address) {
this.timelineServiceAddress = address;
}
-
+
private String getTimelineServiceAddress() {
return this.timelineServiceAddress;
}
@@ -629,7 +629,7 @@ public class TimelineClientImpl extends TimelineClient {
throw new YarnRuntimeException("Unknown resource type");
}
}
-
+
/**
* Poll TimelineServiceAddress for maximum of retries times if it is null
* @param retries
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 226d8ce..9ac54ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -714,10 +714,10 @@
<name>yarn.nodemanager.container-manager.thread-count</name>
<value>20</value>
</property>
-
+
<property>
- <description>Number of threads aggregator service uses.</description>
- <name>yarn.nodemanager.aggregator-service.thread-count</name>
+ <description>Number of threads collector service uses.</description>
+ <name>yarn.nodemanager.collector-service.thread-count</name>
<value>5</value>
</property>
@@ -788,11 +788,11 @@
<name>yarn.nodemanager.localizer.address</name>
<value>${yarn.nodemanager.hostname}:8040</value>
</property>
-
-
+
+
<property>
- <description>Address where the aggregator service IPC is.</description>
- <name>yarn.nodemanager.aggregator-service.address</name>
+ <description>Address where the collector service IPC is.</description>
+ <name>yarn.nodemanager.collector-service.address</name>
<value>${yarn.nodemanager.hostname}:8048</value>
</property>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index 26d6d04..515adc8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -131,7 +131,7 @@ public class TestContainerLaunchRPC {
Assert.fail("timeout exception should have occurred!");
}
-
+
public static Token newContainerToken(NodeId nodeId, byte[] password,
ContainerTokenIdentifier tokenIdentifier) {
// RPC layer client expects ip:port as service for tokens
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
index ef0bdcc..5af0f11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
@@ -111,21 +111,21 @@ public class TestAllocateResponse {
Assert.assertEquals(0, r.getIncreasedContainers().size());
Assert.assertEquals(0, r.getDecreasedContainers().size());
}
-
+
@SuppressWarnings("deprecation")
@Test
- public void testAllocateResponseWithAggregatorAddress() {
- final String aggregatorAddr = "localhost:0";
+ public void testAllocateResponseWithCollectorAddress() {
+ final String collectorAddr = "localhost:0";
AllocateResponse r =
AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
- AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null,
- null, null, aggregatorAddr);
+ AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null,
+ null, null, collectorAddr);
AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
r = new AllocateResponsePBImpl(p);
// check value
- Assert.assertEquals(aggregatorAddr, r.getAggregatorAddr());
+ Assert.assertEquals(collectorAddr, r.getCollectorAddr());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index d1e4acb..4fea04c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -142,7 +142,7 @@
<include>yarn_server_common_service_protos.proto</include>
<include>ResourceTracker.proto</include>
<include>SCMUploader.proto</include>
- <include>aggregatornodemanager_protocol.proto</include>
+ <include>collectornodemanager_protocol.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
deleted file mode 100644
index 53bdb4e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-
-/**
- * <p>The protocol between an <code>TimelineAggregatorsCollection</code> and a
- * <code>NodeManager</code> to report a new application aggregator get launched.
- * </p>
- *
- */
-@Private
-public interface AggregatorNodemanagerProtocol {
-
- /**
- *
- * <p>
- * The <code>TimelineAggregatorsCollection</code> provides a list of mapping
- * between application and aggregator's address in
- * {@link ReportNewAggregatorsInfoRequest} to a <code>NodeManager</code> to
- * <em>register</em> aggregator's info, include: applicationId and REST URI to
- * access aggregator. NodeManager will add them into registered aggregators
- * and register them into <code>ResourceManager</code> afterwards.
- * </p>
- *
- * @param request the request of registering a new aggregator or a list of aggregators
- * @return
- * @throws YarnException
- * @throws IOException
- */
- ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request)
- throws YarnException, IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
deleted file mode 100644
index 4df80a5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService;
-
-@Private
-@Unstable
-@ProtocolInfo(
- protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB",
- protocolVersion = 1)
-public interface AggregatorNodemanagerProtocolPB extends
- AggregatorNodemanagerProtocolService.BlockingInterface {
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
new file mode 100644
index 0000000..26c121a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+
+/**
+ * <p>The protocol between an <code>TimelineCollectorManager</code> and a
+ * <code>NodeManager</code> to report a new application collector get launched.
+ * </p>
+ *
+ */
+@Private
+public interface CollectorNodemanagerProtocol {
+
+ /**
+ *
+ * <p>
+ * The <code>TimelineCollectorManager</code> provides a list of mapping
+ * between application and collector's address in
+ * {@link ReportNewCollectorInfoRequest} to a <code>NodeManager</code> to
+ * <em>register</em> collector's info, include: applicationId and REST URI to
+ * access collector. NodeManager will add them into registered collectors
+ * and register them into <code>ResourceManager</code> afterwards.
+ * </p>
+ *
+ * @param request the request of registering a new collector or a list of
+ * collectors
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request)
+ throws YarnException, IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
new file mode 100644
index 0000000..655e989
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.CollectorNodemanagerProtocol.CollectorNodemanagerProtocolService;
+
+@Private
+@Unstable
+@ProtocolInfo(
+ protocolName = "org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB",
+ protocolVersion = 1)
+public interface CollectorNodemanagerProtocolPB extends
+ CollectorNodemanagerProtocolService.BlockingInterface {
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
deleted file mode 100644
index 6e777e7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.impl.pb.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
-
-import com.google.protobuf.ServiceException;
-
-public class AggregatorNodemanagerProtocolPBClientImpl implements
- AggregatorNodemanagerProtocol, Closeable {
-
- // Not a documented config. Only used for tests internally
- static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
- + "rpc.nm-command-timeout";
-
- /**
- * Maximum of 1 minute timeout for a Node to react to the command
- */
- static final int DEFAULT_COMMAND_TIMEOUT = 60000;
-
- private AggregatorNodemanagerProtocolPB proxy;
-
- @Private
- public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion,
- InetSocketAddress addr, Configuration conf) throws IOException {
- RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class,
- ProtobufRpcEngine.class);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
- proxy =
- (AggregatorNodemanagerProtocolPB) RPC.getProxy(
- AggregatorNodemanagerProtocolPB.class,
- clientVersion, addr, ugi, conf,
- NetUtils.getDefaultSocketFactory(conf), expireIntvl);
- }
-
- @Override
- public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request) throws YarnException, IOException {
-
- ReportNewAggregatorsInfoRequestProto requestProto =
- ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto();
- try {
- return new ReportNewAggregatorsInfoResponsePBImpl(
- proxy.reportNewAggregatorInfo(null, requestProto));
- } catch (ServiceException e) {
- RPCUtil.unwrapAndThrowException(e);
- return null;
- }
- }
-
- @Override
- public void close() {
- if (this.proxy != null) {
- RPC.stopProxy(this.proxy);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
new file mode 100644
index 0000000..276a540
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class CollectorNodemanagerProtocolPBClientImpl implements
+ CollectorNodemanagerProtocol, Closeable {
+
+ // Not a documented config. Only used for tests internally
+ static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+ + "rpc.nm-command-timeout";
+
+ /**
+ * Maximum of 1 minute timeout for a Node to react to the command
+ */
+ static final int DEFAULT_COMMAND_TIMEOUT = 60000;
+
+ private CollectorNodemanagerProtocolPB proxy;
+
+ @Private
+ public CollectorNodemanagerProtocolPBClientImpl(long clientVersion,
+ InetSocketAddress addr, Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, CollectorNodemanagerProtocolPB.class,
+ ProtobufRpcEngine.class);
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
+ proxy =
+ (CollectorNodemanagerProtocolPB) RPC.getProxy(
+ CollectorNodemanagerProtocolPB.class,
+ clientVersion, addr, ugi, conf,
+ NetUtils.getDefaultSocketFactory(conf), expireIntvl);
+ }
+
+ @Override
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request) throws YarnException, IOException {
+
+ ReportNewCollectorInfoRequestProto requestProto =
+ ((ReportNewCollectorInfoRequestPBImpl) request).getProto();
+ try {
+ return new ReportNewCollectorInfoResponsePBImpl(
+ proxy.reportNewCollectorInfo(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
deleted file mode 100644
index 87bce16..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.impl.pb.service;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
-import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
-
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-
-public class AggregatorNodemanagerProtocolPBServiceImpl implements
- AggregatorNodemanagerProtocolPB {
-
- private AggregatorNodemanagerProtocol real;
-
- public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) {
- this.real = impl;
- }
-
- @Override
- public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo(
- RpcController arg0, ReportNewAggregatorsInfoRequestProto proto)
- throws ServiceException {
- ReportNewAggregatorsInfoRequestPBImpl request =
- new ReportNewAggregatorsInfoRequestPBImpl(proto);
- try {
- ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request);
- return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto();
- } catch (YarnException e) {
- throw new ServiceException(e);
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
new file mode 100644
index 0000000..3f42732
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class CollectorNodemanagerProtocolPBServiceImpl implements
+ CollectorNodemanagerProtocolPB {
+
+ private CollectorNodemanagerProtocol real;
+
+ public CollectorNodemanagerProtocolPBServiceImpl(CollectorNodemanagerProtocol impl) {
+ this.real = impl;
+ }
+
+ @Override
+ public ReportNewCollectorInfoResponseProto reportNewCollectorInfo(
+ RpcController arg0, ReportNewCollectorInfoRequestProto proto)
+ throws ServiceException {
+ ReportNewCollectorInfoRequestPBImpl request =
+ new ReportNewCollectorInfoRequestPBImpl(proto);
+ try {
+ ReportNewCollectorInfoResponse response = real.reportNewCollectorInfo(request);
+ return ((ReportNewCollectorInfoResponsePBImpl)response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index 0b020b7..2f080d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
-import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -40,11 +39,11 @@ public abstract class NodeHeartbeatRequest {
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
return nodeHeartbeatRequest;
}
-
+
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey,
- Map<ApplicationId, String> registeredAggregators) {
+ Map<ApplicationId, String> registeredCollectors) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@@ -52,7 +51,7 @@ public abstract class NodeHeartbeatRequest {
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
- nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators);
+ nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors);
return nodeHeartbeatRequest;
}
@@ -64,8 +63,9 @@ public abstract class NodeHeartbeatRequest {
public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
-
- // This tells RM registered aggregators' address info on this node
- public abstract Map<ApplicationId, String> getRegisteredAggregators();
- public abstract void setRegisteredAggregators(Map<ApplicationId, String> appAggregatorsMap);
+
+ // This tells RM registered collectors' address info on this node
+ public abstract Map<ApplicationId, String> getRegisteredCollectors();
+ public abstract void setRegisteredCollectors(Map<ApplicationId,
+ String> appCollectorsMap);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 262ca07..167a9cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -35,10 +35,10 @@ public interface NodeHeartbeatResponse {
List<ContainerId> getContainersToBeRemovedFromNM();
List<ApplicationId> getApplicationsToCleanup();
-
- // This tells NM the aggregators' address info of related Apps
- Map<ApplicationId, String> getAppAggregatorsMap();
- void setAppAggregatorsMap(Map<ApplicationId, String> appAggregatorsMap);
+
+ // This tells NM the collectors' address info of related apps
+ Map<ApplicationId, String> getAppCollectorsMap();
+ void setAppCollectorsMap(Map<ApplicationId, String> appCollectorsMap);
void setResponseId(int responseId);
void setNodeAction(NodeAction action);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
deleted file mode 100644
index ae538a2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.protocolrecords;
-
-import java.util.List;
-import java.util.Arrays;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
-import org.apache.hadoop.yarn.util.Records;
-
-@Private
-public abstract class ReportNewAggregatorsInfoRequest {
-
- public static ReportNewAggregatorsInfoRequest newInstance(
- List<AppAggregatorsMap> appAggregatorsList) {
- ReportNewAggregatorsInfoRequest request =
- Records.newRecord(ReportNewAggregatorsInfoRequest.class);
- request.setAppAggregatorsList(appAggregatorsList);
- return request;
- }
-
- public static ReportNewAggregatorsInfoRequest newInstance(
- ApplicationId id, String aggregatorAddr) {
- ReportNewAggregatorsInfoRequest request =
- Records.newRecord(ReportNewAggregatorsInfoRequest.class);
- request.setAppAggregatorsList(
- Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr)));
- return request;
- }
-
- public abstract List<AppAggregatorsMap> getAppAggregatorsList();
-
- public abstract void setAppAggregatorsList(
- List<AppAggregatorsMap> appAggregatorsList);
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
deleted file mode 100644
index 3b847d6..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.protocolrecords;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.yarn.util.Records;
-
-public abstract class ReportNewAggregatorsInfoResponse {
-
- @Private
- public static ReportNewAggregatorsInfoResponse newInstance() {
- ReportNewAggregatorsInfoResponse response =
- Records.newRecord(ReportNewAggregatorsInfoResponse.class);
- return response;
- }
-
-}
[3/4] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to
TimelineCollector. Contributed by Sangjin Lee
Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
new file mode 100644
index 0000000..3498de9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+public abstract class ReportNewCollectorInfoRequest {
+
+ public static ReportNewCollectorInfoRequest newInstance(
+ List<AppCollectorsMap> appCollectorsList) {
+ ReportNewCollectorInfoRequest request =
+ Records.newRecord(ReportNewCollectorInfoRequest.class);
+ request.setAppCollectorsList(appCollectorsList);
+ return request;
+ }
+
+ public static ReportNewCollectorInfoRequest newInstance(
+ ApplicationId id, String collectorAddr) {
+ ReportNewCollectorInfoRequest request =
+ Records.newRecord(ReportNewCollectorInfoRequest.class);
+ request.setAppCollectorsList(
+ Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr)));
+ return request;
+ }
+
+ public abstract List<AppCollectorsMap> getAppCollectorsList();
+
+ public abstract void setAppCollectorsList(
+ List<AppCollectorsMap> appCollectorsList);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
new file mode 100644
index 0000000..4157c47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class ReportNewCollectorInfoResponse {
+
+ @Private
+ public static ReportNewCollectorInfoResponse newInstance() {
+ ReportNewCollectorInfoResponse response =
+ Records.newRecord(ReportNewCollectorInfoResponse.class);
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 39eaabd..5bc21e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -44,7 +44,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatus nodeStatus = null;
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
- Map<ApplicationId, String> registeredAggregators = null;
+ Map<ApplicationId, String> registeredCollectors = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@@ -89,19 +89,19 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.setLastKnownNmTokenMasterKey(
convertToProtoFormat(this.lastKnownNMTokenMasterKey));
}
-
- if (this.registeredAggregators != null) {
- addRegisteredAggregatorsToProto();
+
+ if (this.registeredCollectors != null) {
+ addRegisteredCollectorsToProto();
}
}
-
- private void addRegisteredAggregatorsToProto() {
+
+ private void addRegisteredCollectorsToProto() {
maybeInitBuilder();
- builder.clearRegisteredAggregators();
- for (Map.Entry<ApplicationId, String> entry : registeredAggregators.entrySet()) {
- builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder()
+ builder.clearRegisteredCollectors();
+ for (Map.Entry<ApplicationId, String> entry : registeredCollectors.entrySet()) {
+ builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
- .setAppAggregatorAddr(entry.getValue()));
+ .setAppCollectorAddr(entry.getValue()));
}
}
@@ -185,35 +185,35 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.clearLastKnownNmTokenMasterKey();
this.lastKnownNMTokenMasterKey = masterKey;
}
-
+
@Override
- public Map<ApplicationId, String> getRegisteredAggregators() {
- if (this.registeredAggregators != null) {
- return this.registeredAggregators;
+ public Map<ApplicationId, String> getRegisteredCollectors() {
+ if (this.registeredCollectors != null) {
+ return this.registeredCollectors;
}
- initRegisteredAggregators();
- return registeredAggregators;
+ initRegisteredCollectors();
+ return registeredCollectors;
}
-
- private void initRegisteredAggregators() {
+
+ private void initRegisteredCollectors() {
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
- List<AppAggregatorsMapProto> list = p.getRegisteredAggregatorsList();
- this.registeredAggregators = new HashMap<ApplicationId, String> ();
- for (AppAggregatorsMapProto c : list) {
+ List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
+ this.registeredCollectors = new HashMap<ApplicationId, String> ();
+ for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
- this.registeredAggregators.put(appId, c.getAppAggregatorAddr());
+ this.registeredCollectors.put(appId, c.getAppCollectorAddr());
}
}
-
+
@Override
- public void setRegisteredAggregators(
- Map<ApplicationId, String> registeredAggregators) {
- if (registeredAggregators == null || registeredAggregators.isEmpty()) {
+ public void setRegisteredCollectors(
+ Map<ApplicationId, String> registeredCollectors) {
+ if (registeredCollectors == null || registeredCollectors.isEmpty()) {
return;
}
maybeInitBuilder();
- this.registeredAggregators = new HashMap<ApplicationId, String>();
- this.registeredAggregators.putAll(registeredAggregators);
+ this.registeredCollectors = new HashMap<ApplicationId, String>();
+ this.registeredCollectors.putAll(registeredCollectors);
}
private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
@@ -223,11 +223,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatusProto convertToProtoFormat(NodeStatus t) {
return ((NodeStatusPBImpl)t).getProto();
}
-
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
-
+
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 019b2ae..1fd02a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@@ -56,8 +56,8 @@ public class NodeHeartbeatResponsePBImpl extends
private List<ContainerId> containersToBeRemovedFromNM = null;
private List<ApplicationId> applicationsToCleanup = null;
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
-
- Map<ApplicationId, String> appAggregatorsMap = null;
+
+ Map<ApplicationId, String> appCollectorsMap = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@@ -99,9 +99,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
-
- if (this.appAggregatorsMap != null) {
- addAppAggregatorsMapToProto();
+
+ if (this.appCollectorsMap != null) {
+ addAppCollectorsMapToProto();
}
}
@@ -115,14 +115,14 @@ public class NodeHeartbeatResponsePBImpl extends
entry.getValue().duplicate())));
}
}
-
- private void addAppAggregatorsMapToProto() {
+
+ private void addAppCollectorsMapToProto() {
maybeInitBuilder();
- builder.clearAppAggregatorsMap();
- for (Map.Entry<ApplicationId, String> entry : appAggregatorsMap.entrySet()) {
- builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder()
+ builder.clearAppCollectorsMap();
+ for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) {
+ builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder()
.setAppId(convertToProtoFormat(entry.getKey()))
- .setAppAggregatorAddr(entry.getValue()));
+ .setAppCollectorAddr(entry.getValue()));
}
}
@@ -434,14 +434,14 @@ public class NodeHeartbeatResponsePBImpl extends
initSystemCredentials();
return systemCredentials;
}
-
+
@Override
- public Map<ApplicationId, String> getAppAggregatorsMap() {
- if (this.appAggregatorsMap != null) {
- return this.appAggregatorsMap;
+ public Map<ApplicationId, String> getAppCollectorsMap() {
+ if (this.appCollectorsMap != null) {
+ return this.appCollectorsMap;
}
- initAppAggregatorsMap();
- return appAggregatorsMap;
+ initAppCollectorsMap();
+ return appCollectorsMap;
}
private void initSystemCredentials() {
@@ -454,14 +454,14 @@ public class NodeHeartbeatResponsePBImpl extends
this.systemCredentials.put(appId, byteBuffer);
}
}
-
- private void initAppAggregatorsMap() {
+
+ private void initAppCollectorsMap() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
- List<AppAggregatorsMapProto> list = p.getAppAggregatorsMapList();
- this.appAggregatorsMap = new HashMap<ApplicationId, String> ();
- for (AppAggregatorsMapProto c : list) {
+ List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
+ this.appCollectorsMap = new HashMap<ApplicationId, String> ();
+ for (AppCollectorsMapProto c : list) {
ApplicationId appId = convertFromProtoFormat(c.getAppId());
- this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr());
+ this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
}
}
@@ -475,16 +475,16 @@ public class NodeHeartbeatResponsePBImpl extends
this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
this.systemCredentials.putAll(systemCredentials);
}
-
+
@Override
- public void setAppAggregatorsMap(
- Map<ApplicationId, String> appAggregatorsMap) {
- if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) {
+ public void setAppCollectorsMap(
+ Map<ApplicationId, String> appCollectorsMap) {
+ if (appCollectorsMap == null || appCollectorsMap.isEmpty()) {
return;
}
maybeInitBuilder();
- this.appAggregatorsMap = new HashMap<ApplicationId, String>();
- this.appAggregatorsMap.putAll(appAggregatorsMap);
+ this.appCollectorsMap = new HashMap<ApplicationId, String>();
+ this.appCollectorsMap.putAll(appCollectorsMap);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
deleted file mode 100644
index eb7beef..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl;
-
-public class ReportNewAggregatorsInfoRequestPBImpl extends
- ReportNewAggregatorsInfoRequest {
-
- ReportNewAggregatorsInfoRequestProto proto =
- ReportNewAggregatorsInfoRequestProto.getDefaultInstance();
-
- ReportNewAggregatorsInfoRequestProto.Builder builder = null;
- boolean viaProto = false;
-
- private List<AppAggregatorsMap> aggregatorsList = null;
-
- public ReportNewAggregatorsInfoRequestPBImpl() {
- builder = ReportNewAggregatorsInfoRequestProto.newBuilder();
- }
-
- public ReportNewAggregatorsInfoRequestPBImpl(
- ReportNewAggregatorsInfoRequestProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public ReportNewAggregatorsInfoRequestProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- @Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null)
- return false;
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- private void mergeLocalToProto() {
- if (viaProto)
- maybeInitBuilder();
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private void mergeLocalToBuilder() {
- if (aggregatorsList != null) {
- addLocalAggregatorsToProto();
- }
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private void addLocalAggregatorsToProto() {
- maybeInitBuilder();
- builder.clearAppAggregators();
- List<AppAggregatorsMapProto> protoList =
- new ArrayList<AppAggregatorsMapProto>();
- for (AppAggregatorsMap m : this.aggregatorsList) {
- protoList.add(convertToProtoFormat(m));
- }
- builder.addAllAppAggregators(protoList);
- }
-
- private void initLocalAggregatorsList() {
- ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
- List<AppAggregatorsMapProto> aggregatorsList =
- p.getAppAggregatorsList();
- this.aggregatorsList = new ArrayList<AppAggregatorsMap>();
- for (AppAggregatorsMapProto m : aggregatorsList) {
- this.aggregatorsList.add(convertFromProtoFormat(m));
- }
- }
-
- @Override
- public List<AppAggregatorsMap> getAppAggregatorsList() {
- if (this.aggregatorsList == null) {
- initLocalAggregatorsList();
- }
- return this.aggregatorsList;
- }
-
- @Override
- public void setAppAggregatorsList(List<AppAggregatorsMap> appAggregatorsList) {
- maybeInitBuilder();
- if (appAggregatorsList == null) {
- builder.clearAppAggregators();
- }
- this.aggregatorsList = appAggregatorsList;
- }
-
- private AppAggregatorsMapPBImpl convertFromProtoFormat(
- AppAggregatorsMapProto p) {
- return new AppAggregatorsMapPBImpl(p);
- }
-
- private AppAggregatorsMapProto convertToProtoFormat(
- AppAggregatorsMap m) {
- return ((AppAggregatorsMapPBImpl) m).getProto();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
deleted file mode 100644
index 0f0925a..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-
-import com.google.protobuf.TextFormat;
-
-@Private
-@Unstable
-public class ReportNewAggregatorsInfoResponsePBImpl extends
- ReportNewAggregatorsInfoResponse {
-
- ReportNewAggregatorsInfoResponseProto proto =
- ReportNewAggregatorsInfoResponseProto.getDefaultInstance();
-
- ReportNewAggregatorsInfoResponseProto.Builder builder = null;
-
- boolean viaProto = false;
-
- public ReportNewAggregatorsInfoResponsePBImpl() {
- builder = ReportNewAggregatorsInfoResponseProto.newBuilder();
- }
-
- public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public ReportNewAggregatorsInfoResponseProto getProto() {
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- @Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null)
- return false;
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- @Override
- public String toString() {
- return TextFormat.shortDebugString(getProto());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
new file mode 100644
index 0000000..5dd8f17
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl;
+
+public class ReportNewCollectorInfoRequestPBImpl extends
+ ReportNewCollectorInfoRequest {
+
+ ReportNewCollectorInfoRequestProto proto =
+ ReportNewCollectorInfoRequestProto.getDefaultInstance();
+
+ ReportNewCollectorInfoRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List<AppCollectorsMap> collectorsList = null;
+
+ public ReportNewCollectorInfoRequestPBImpl() {
+ builder = ReportNewCollectorInfoRequestProto.newBuilder();
+ }
+
+ public ReportNewCollectorInfoRequestPBImpl(
+ ReportNewCollectorInfoRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReportNewCollectorInfoRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (collectorsList != null) {
+ addLocalCollectorsToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ReportNewCollectorInfoRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addLocalCollectorsToProto() {
+ maybeInitBuilder();
+ builder.clearAppCollectors();
+ List<AppCollectorsMapProto> protoList =
+ new ArrayList<AppCollectorsMapProto>();
+ for (AppCollectorsMap m : this.collectorsList) {
+ protoList.add(convertToProtoFormat(m));
+ }
+ builder.addAllAppCollectors(protoList);
+ }
+
+ private void initLocalCollectorsList() {
+ ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<AppCollectorsMapProto> collectorsList =
+ p.getAppCollectorsList();
+ this.collectorsList = new ArrayList<AppCollectorsMap>();
+ for (AppCollectorsMapProto m : collectorsList) {
+ this.collectorsList.add(convertFromProtoFormat(m));
+ }
+ }
+
+ @Override
+ public List<AppCollectorsMap> getAppCollectorsList() {
+ if (this.collectorsList == null) {
+ initLocalCollectorsList();
+ }
+ return this.collectorsList;
+ }
+
+ @Override
+ public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) {
+ maybeInitBuilder();
+ if (appCollectorsList == null) {
+ builder.clearAppCollectors();
+ }
+ this.collectorsList = appCollectorsList;
+ }
+
+ private AppCollectorsMapPBImpl convertFromProtoFormat(
+ AppCollectorsMapProto p) {
+ return new AppCollectorsMapPBImpl(p);
+ }
+
+ private AppCollectorsMapProto convertToProtoFormat(
+ AppCollectorsMap m) {
+ return ((AppCollectorsMapPBImpl) m).getProto();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
new file mode 100644
index 0000000..7c90675
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ReportNewCollectorInfoResponsePBImpl extends
+ ReportNewCollectorInfoResponse {
+
+ ReportNewCollectorInfoResponseProto proto =
+ ReportNewCollectorInfoResponseProto.getDefaultInstance();
+
+ ReportNewCollectorInfoResponseProto.Builder builder = null;
+
+ boolean viaProto = false;
+
+ public ReportNewCollectorInfoResponsePBImpl() {
+ builder = ReportNewCollectorInfoResponseProto.newBuilder();
+ }
+
+ public ReportNewCollectorInfoResponsePBImpl(ReportNewCollectorInfoResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReportNewCollectorInfoResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
deleted file mode 100644
index 67c377d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.hadoop.yarn.server.api.records;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.Records;
-
-
-@Private
-public abstract class AppAggregatorsMap {
-
- public static AppAggregatorsMap newInstance(
- ApplicationId id, String aggregatorAddr) {
- AppAggregatorsMap appAggregatorMap =
- Records.newRecord(AppAggregatorsMap.class);
- appAggregatorMap.setApplicationId(id);
- appAggregatorMap.setAggregatorAddr(aggregatorAddr);
- return appAggregatorMap;
- }
-
- public abstract ApplicationId getApplicationId();
-
- public abstract void setApplicationId(
- ApplicationId id);
-
- public abstract String getAggregatorAddr();
-
- public abstract void setAggregatorAddr(
- String addr);
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
new file mode 100644
index 0000000..07e1d92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+public abstract class AppCollectorsMap {
+
+ public static AppCollectorsMap newInstance(
+ ApplicationId id, String collectorAddr) {
+ AppCollectorsMap appCollectorsMap =
+ Records.newRecord(AppCollectorsMap.class);
+ appCollectorsMap.setApplicationId(id);
+ appCollectorsMap.setCollectorAddr(collectorAddr);
+ return appCollectorsMap;
+ }
+
+ public abstract ApplicationId getApplicationId();
+
+ public abstract void setApplicationId(ApplicationId id);
+
+ public abstract String getCollectorAddr();
+
+ public abstract void setCollectorAddr(String addr);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
deleted file mode 100644
index 32903e2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-package org.apache.hadoop.yarn.server.api.records.impl.pb;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
-
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder;
-
-import com.google.protobuf.TextFormat;
-
-@Private
-@Unstable
-public class AppAggregatorsMapPBImpl extends AppAggregatorsMap {
-
- AppAggregatorsMapProto proto =
- AppAggregatorsMapProto.getDefaultInstance();
-
- AppAggregatorsMapProto.Builder builder = null;
- boolean viaProto = false;
-
- private ApplicationId appId = null;
- private String aggregatorAddr = null;
-
- public AppAggregatorsMapPBImpl() {
- builder = AppAggregatorsMapProto.newBuilder();
- }
-
- public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) {
- this.proto = proto;
- viaProto = true;
- }
-
- public AppAggregatorsMapProto getProto() {
- mergeLocalToProto();
- proto = viaProto ? proto : builder.build();
- viaProto = true;
- return proto;
- }
-
- @Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null)
- return false;
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- @Override
- public String toString() {
- return TextFormat.shortDebugString(getProto());
- }
-
- @Override
- public ApplicationId getApplicationId() {
- AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
- if (this.appId == null && p.hasAppId()) {
- this.appId = convertFromProtoFormat(p.getAppId());
- }
- return this.appId;
- }
-
- @Override
- public String getAggregatorAddr() {
- AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
- if (this.aggregatorAddr == null
- && p.hasAppAggregatorAddr()) {
- this.aggregatorAddr = p.getAppAggregatorAddr();
- }
- return this.aggregatorAddr;
- }
-
- @Override
- public void setApplicationId(ApplicationId appId) {
- maybeInitBuilder();
- if (appId == null) {
- builder.clearAppId();
- }
- this.appId = appId;
- }
-
- @Override
- public void setAggregatorAddr(String aggregatorAddr) {
- maybeInitBuilder();
- if (aggregatorAddr == null) {
- builder.clearAppAggregatorAddr();
- }
- this.aggregatorAddr = aggregatorAddr;
- }
-
- private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
- return new ApplicationIdPBImpl(p);
- }
-
- private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
- return ((ApplicationIdPBImpl) t).getProto();
- }
-
- private void maybeInitBuilder() {
- if (viaProto || builder == null) {
- builder = AppAggregatorsMapProto.newBuilder(proto);
- }
- viaProto = false;
- }
-
- private void mergeLocalToProto() {
- if (viaProto) {
- maybeInitBuilder();
- }
- mergeLocalToBuilder();
- proto = builder.build();
- viaProto = true;
- }
-
- private void mergeLocalToBuilder() {
- if (this.appId != null) {
- builder.setAppId(convertToProtoFormat(this.appId));
- }
- if (this.aggregatorAddr != null) {
- builder.setAppAggregatorAddr(this.aggregatorAddr);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
new file mode 100644
index 0000000..eb3bde4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AppCollectorsMapPBImpl extends AppCollectorsMap {
+
+ AppCollectorsMapProto proto =
+ AppCollectorsMapProto.getDefaultInstance();
+
+ AppCollectorsMapProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId appId = null;
+ private String collectorAddr = null;
+
+ public AppCollectorsMapPBImpl() {
+ builder = AppCollectorsMapProto.newBuilder();
+ }
+
+ public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public AppCollectorsMapProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.appId == null && p.hasAppId()) {
+ this.appId = convertFromProtoFormat(p.getAppId());
+ }
+ return this.appId;
+ }
+
+ @Override
+ public String getCollectorAddr() {
+ AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.collectorAddr == null
+ && p.hasAppCollectorAddr()) {
+ this.collectorAddr = p.getAppCollectorAddr();
+ }
+ return this.collectorAddr;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId appId) {
+ maybeInitBuilder();
+ if (appId == null) {
+ builder.clearAppId();
+ }
+ this.appId = appId;
+ }
+
+ @Override
+ public void setCollectorAddr(String collectorAddr) {
+ maybeInitBuilder();
+ if (collectorAddr == null) {
+ builder.clearAppCollectorAddr();
+ }
+ this.collectorAddr = collectorAddr;
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = AppCollectorsMapProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.appId != null) {
+ builder.setAppId(convertToProtoFormat(this.appId));
+ }
+ if (this.collectorAddr != null) {
+ builder.setAppCollectorAddr(this.collectorAddr);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
deleted file mode 100644
index d7b05c1..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "AggregatorNodemanagerProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
-
-import "yarn_server_common_service_protos.proto";
-
-service AggregatorNodemanagerProtocolService {
- rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
new file mode 100644
index 0000000..654a9f2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "CollectorNodemanagerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service CollectorNodemanagerProtocolService {
+ rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 3b03f58..0086bae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -47,7 +47,7 @@ message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
- repeated AppAggregatorsMapProto registered_aggregators = 4;
+ repeated AppCollectorsMapProto registered_collectors = 4;
}
message NodeHeartbeatResponseProto {
@@ -61,7 +61,7 @@ message NodeHeartbeatResponseProto {
optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
- repeated AppAggregatorsMapProto app_aggregators_map = 11;
+ repeated AppCollectorsMapProto app_collectors_map = 11;
}
message SystemCredentialsForAppsProto {
@@ -70,21 +70,21 @@ message SystemCredentialsForAppsProto {
}
////////////////////////////////////////////////////////////////////////
-////// From aggregator_nodemanager_protocol ////////////////////////////
+////// From collector_nodemanager_protocol ////////////////////////////
////////////////////////////////////////////////////////////////////////
-message AppAggregatorsMapProto {
+message AppCollectorsMapProto {
optional ApplicationIdProto appId = 1;
- optional string appAggregatorAddr = 2;
+ optional string appCollectorAddr = 2;
}
//////////////////////////////////////////////////////
-/////// aggregator_nodemanager_protocol //////////////
+/////// collector_nodemanager_protocol //////////////
//////////////////////////////////////////////////////
-message ReportNewAggregatorsInfoRequestProto {
- repeated AppAggregatorsMapProto app_aggregators = 1;
+message ReportNewCollectorInfoRequestProto {
+ repeated AppCollectorsMapProto app_collectors = 1;
}
-message ReportNewAggregatorsInfoResponseProto {
+message ReportNewCollectorInfoResponseProto {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index af9d60f..cfc3dc6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -59,10 +59,10 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -72,15 +72,15 @@ public class TestRPC {
private static final String EXCEPTION_MSG = "test error";
private static final String EXCEPTION_CAUSE = "exception cause";
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
- public static final String ILLEGAL_NUMBER_MESSAGE =
- "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE.";
-
- public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0";
-
- public static final ApplicationId DEFAULT_APP_ID =
+
+ public static final String ILLEGAL_NUMBER_MESSAGE =
+ "collectors' number in ReportNewCollectorInfoRequest is not ONE.";
+
+ public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
+
+ public static final ApplicationId DEFAULT_APP_ID =
ApplicationId.newInstance(0, 0);
-
+
@Test
public void testUnknownCall() {
Configuration conf = new Configuration();
@@ -112,17 +112,17 @@ public class TestRPC {
server.stop();
}
}
-
+
@Test
- public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
+ public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
- Server server = rpc.getServer(AggregatorNodemanagerProtocol.class,
- new DummyNMAggregatorService(), addr, conf, null, 1);
+ Server server = rpc.getServer(CollectorNodemanagerProtocol.class,
+ new DummyNMCollectorService(), addr, conf, null, 1);
server.start();
// Test unrelated protocol wouldn't get response
@@ -141,31 +141,31 @@ public class TestRPC {
} catch (Exception e) {
e.printStackTrace();
}
-
- // Test AggregatorNodemanagerProtocol get proper response
- AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy(
- AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
- // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get
+
+ // Test CollectorNodemanagerProtocol get proper response
+ CollectorNodemanagerProtocol proxy = (CollectorNodemanagerProtocol)rpc.getProxy(
+ CollectorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
+ // Verify request with DEFAULT_APP_ID and DEFAULT_COLLECTOR_ADDR get
// normally response.
try {
- ReportNewAggregatorsInfoRequest request =
- ReportNewAggregatorsInfoRequest.newInstance(
- DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR);
- proxy.reportNewAggregatorInfo(request);
+ ReportNewCollectorInfoRequest request =
+ ReportNewCollectorInfoRequest.newInstance(
+ DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
+ proxy.reportNewCollectorInfo(request);
} catch (YarnException e) {
Assert.fail("RPC call failured is not expected here.");
}
-
- // Verify empty request get YarnException back (by design in
- // DummyNMAggregatorService)
+
+ // Verify empty request get YarnException back (by design in
+ // DummyNMCollectorService)
try {
- proxy.reportNewAggregatorInfo(Records
- .newRecord(ReportNewAggregatorsInfoRequest.class));
+ proxy.reportNewCollectorInfo(Records
+ .newRecord(ReportNewCollectorInfoRequest.class));
Assert.fail("Excepted RPC call to fail with YarnException.");
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
}
-
+
server.stop();
}
@@ -173,21 +173,21 @@ public class TestRPC {
public void testHadoopProtoRPC() throws Exception {
test(HadoopYarnProtoRPC.class.getName());
}
-
+
private void test(String rpcClass) throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
- Server server = rpc.getServer(ContainerManagementProtocol.class,
+ Server server = rpc.getServer(ContainerManagementProtocol.class,
new DummyContainerManager(), addr, conf, null, 1);
server.start();
RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
- ContainerManagementProtocol proxy = (ContainerManagementProtocol)
- rpc.getProxy(ContainerManagementProtocol.class,
+ ContainerManagementProtocol proxy = (ContainerManagementProtocol)
+ rpc.getProxy(ContainerManagementProtocol.class,
NetUtils.getConnectAddress(server), conf);
- ContainerLaunchContext containerLaunchContext =
+ ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
ApplicationId applicationId = ApplicationId.newInstance(0, 0);
@@ -251,7 +251,7 @@ public class TestRPC {
public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request)
throws YarnException {
- GetContainerStatusesResponse response =
+ GetContainerStatusesResponse response =
recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
response.setContainerStatuses(statuses);
return response;
@@ -283,9 +283,9 @@ public class TestRPC {
}
@Override
- public StopContainersResponse stopContainers(StopContainersRequest request)
+ public StopContainersResponse stopContainers(StopContainersRequest request)
throws YarnException {
- Exception e = new Exception(EXCEPTION_MSG,
+ Exception e = new Exception(EXCEPTION_MSG,
new Exception(EXCEPTION_CAUSE));
throw new YarnException(e);
}
@@ -314,32 +314,32 @@ public class TestRPC {
.buildTokenService(addr).toString());
return containerToken;
}
-
- // A dummy implementation for AggregatorNodemanagerProtocol for test purpose,
- // it only can accept one appID, aggregatorAddr pair or throw exceptions
- public class DummyNMAggregatorService
- implements AggregatorNodemanagerProtocol {
-
+
+ // A dummy implementation for CollectorNodemanagerProtocol for test purpose,
+ // it only can accept one appID, collectorAddr pair or throw exceptions
+ public class DummyNMCollectorService
+ implements CollectorNodemanagerProtocol {
+
@Override
- public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request)
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request)
throws YarnException, IOException {
- List<AppAggregatorsMap> appAggregators = request.getAppAggregatorsList();
- if (appAggregators.size() == 1) {
- // check default appID and aggregatorAddr
- AppAggregatorsMap appAggregator = appAggregators.get(0);
- Assert.assertEquals(appAggregator.getApplicationId(),
+ List<AppCollectorsMap> appCollectors = request.getAppCollectorsList();
+ if (appCollectors.size() == 1) {
+ // check default appID and collectorAddr
+ AppCollectorsMap appCollector = appCollectors.get(0);
+ Assert.assertEquals(appCollector.getApplicationId(),
DEFAULT_APP_ID);
- Assert.assertEquals(appAggregator.getAggregatorAddr(),
- DEFAULT_AGGREGATOR_ADDR);
+ Assert.assertEquals(appCollector.getCollectorAddr(),
+ DEFAULT_COLLECTOR_ADDR);
} else {
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
}
-
- ReportNewAggregatorsInfoResponse response =
- recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class);
+
+ ReportNewCollectorInfoResponse response =
+ recordFactory.newRecordInstance(ReportNewCollectorInfoResponse.class);
return response;
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 47cf8ad..aa7afcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -91,14 +91,14 @@ public class TestYarnServerApiClasses {
original.setLastKnownContainerTokenMasterKey(getMasterKey());
original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus());
- Map<ApplicationId, String> aggregators = getAggregators();
- original.setRegisteredAggregators(aggregators);
+ Map<ApplicationId, String> collectors = getCollectors();
+ original.setRegisteredCollectors(collectors);
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
- assertEquals(aggregators, copy.getRegisteredAggregators());
+ assertEquals(collectors, copy.getRegisteredCollectors());
}
/**
@@ -115,8 +115,8 @@ public class TestYarnServerApiClasses {
original.setNextHeartBeatInterval(1000);
original.setNodeAction(NodeAction.NORMAL);
original.setResponseId(100);
- Map<ApplicationId, String> aggregators = getAggregators();
- original.setAppAggregatorsMap(aggregators);
+ Map<ApplicationId, String> collectors = getCollectors();
+ original.setAppCollectorsMap(collectors);
NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
original.getProto());
@@ -126,7 +126,7 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
- assertEquals(aggregators, copy.getAppAggregatorsMap());
+ assertEquals(collectors, copy.getAppCollectorsMap());
}
/**
@@ -216,15 +216,15 @@ public class TestYarnServerApiClasses {
}
- private Map<ApplicationId, String> getAggregators() {
+ private Map<ApplicationId, String> getCollectors() {
ApplicationId appID = ApplicationId.newInstance(1L, 1);
- String aggregatorAddr = "localhost:0";
- Map<ApplicationId, String> aggregatorMap =
+ String collectorAddr = "localhost:0";
+ Map<ApplicationId, String> collectorMap =
new HashMap<ApplicationId, String>();
- aggregatorMap.put(appID, aggregatorAddr);
- return aggregatorMap;
+ collectorMap.put(appID, collectorAddr);
+ return collectorMap;
}
-
+
private ContainerStatus getContainerStatus(int applicationId,
int containerID, int appAttemptId) {
ContainerStatus status = recordFactory
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 85f3f0d..5c3bffe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -57,19 +57,19 @@ public interface Context {
ConcurrentMap<ApplicationId, Application> getApplications();
Map<ApplicationId, Credentials> getSystemCredentialsForApps();
-
+
/**
- * Get the registered aggregators that located on this NM.
+ * Get the registered collectors that located on this NM.
* @return registered
*/
- Map<ApplicationId, String> getRegisteredAggregators();
-
+ Map<ApplicationId, String> getRegisteredCollectors();
+
/**
- * Return the known aggregators which get from RM for all active applications
+ * Return the known collectors which get from RM for all active applications
* running on this NM.
- * @return known aggregators.
+ * @return known collectors.
*/
- Map<ApplicationId, String> getKnownAggregators();
+ Map<ApplicationId, String> getKnownCollectors();
ConcurrentMap<ContainerId, Container> getContainers();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 10143db..bb2c9f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
-import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService;
+import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -85,7 +85,7 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
- private NMAggregatorService nmAggregatorService;
+ private NMCollectorService nmCollectorService;
private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private NMStateStoreService nmStore = null;
@@ -114,9 +114,9 @@ public class NodeManager extends CompositeService
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, dirsHandler);
}
-
- protected NMAggregatorService createNMAggregatorService(Context context) {
- return new NMAggregatorService(context);
+
+ protected NMCollectorService createNMCollectorService(Context context) {
+ return new NMCollectorService(context);
}
protected WebServer createWebServer(Context nmContext,
@@ -274,9 +274,9 @@ public class NodeManager extends CompositeService
addService(dispatcher);
DefaultMetricsSystem.initialize("NodeManager");
-
- this.nmAggregatorService = createNMAggregatorService(context);
- addService(nmAggregatorService);
+
+ this.nmCollectorService = createNMCollectorService(context);
+ addService(nmCollectorService);
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
@@ -354,11 +354,11 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
-
- protected Map<ApplicationId, String> registeredAggregators =
+
+ protected Map<ApplicationId, String> registeredCollectors =
new ConcurrentHashMap<ApplicationId, String>();
-
- protected Map<ApplicationId, String> knownAggregators =
+
+ protected Map<ApplicationId, String> knownCollectors =
new ConcurrentHashMap<ApplicationId, String>();
private final NMContainerTokenSecretManager containerTokenSecretManager;
@@ -475,30 +475,30 @@ public class NodeManager extends CompositeService
Map<ApplicationId, Credentials> systemCredentials) {
this.systemCredentials = systemCredentials;
}
-
+
@Override
- public Map<ApplicationId, String> getRegisteredAggregators() {
- return this.registeredAggregators;
+ public Map<ApplicationId, String> getRegisteredCollectors() {
+ return this.registeredCollectors;
}
- public void addRegisteredAggregators(
- Map<ApplicationId, String> newRegisteredAggregators) {
- this.registeredAggregators.putAll(newRegisteredAggregators);
- // Update to knownAggregators as well so it can immediately be consumed by
+ public void addRegisteredCollectors(
+ Map<ApplicationId, String> newRegisteredCollectors) {
+ this.registeredCollectors.putAll(newRegisteredCollectors);
+ // Update to knownCollectors as well so it can immediately be consumed by
// this NM's TimelineClient.
- this.knownAggregators.putAll(newRegisteredAggregators);
+ this.knownCollectors.putAll(newRegisteredCollectors);
}
-
+
@Override
- public Map<ApplicationId, String> getKnownAggregators() {
- return this.knownAggregators;
+ public Map<ApplicationId, String> getKnownCollectors() {
+ return this.knownCollectors;
}
- public void addKnownAggregators(
- Map<ApplicationId, String> knownAggregators) {
- this.knownAggregators.putAll(knownAggregators);
+ public void addKnownCollectors(
+ Map<ApplicationId, String> knownCollectors) {
+ this.knownCollectors.putAll(knownCollectors);
}
-
+
}
@@ -562,10 +562,10 @@ public class NodeManager extends CompositeService
public Context getNMContext() {
return this.context;
}
-
+
// For testing
- NMAggregatorService getNMAggregatorService() {
- return this.nmAggregatorService;
+ NMCollectorService getNMCollectorService() {
+ return this.nmCollectorService;
}
public static void main(String[] args) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index c855833..a251204 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -593,7 +593,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
.getCurrentKey(),
- NodeStatusUpdaterImpl.this.context.getRegisteredAggregators());
+ NodeStatusUpdaterImpl.this.context.getRegisteredCollectors());
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -656,10 +656,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
((NMContext) context)
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
}
-
- Map<ApplicationId, String> knownAggregators = response.getAppAggregatorsMap();
- ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators);
-
+
+ Map<ApplicationId, String> knownCollectors =
+ response.getAppCollectorsMap();
+ ((NodeManager.NMContext)context).addKnownCollectors(knownCollectors);
+
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
deleted file mode 100644
index 17150ba..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.CompositeService;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-
-public class NMAggregatorService extends CompositeService implements
- AggregatorNodemanagerProtocol {
-
- private static final Log LOG = LogFactory.getLog(NMAggregatorService.class);
-
- final Context context;
-
- private Server server;
-
- public NMAggregatorService(Context context) {
-
- super(NMAggregatorService.class.getName());
- this.context = context;
- }
-
- @Override
- protected void serviceStart() throws Exception {
- Configuration conf = getConfig();
-
- InetSocketAddress aggregatorServerAddress = conf.getSocketAddr(
- YarnConfiguration.NM_BIND_HOST,
- YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
-
- Configuration serverConf = new Configuration(conf);
-
- // TODO Security settings.
- YarnRPC rpc = YarnRPC.create(conf);
-
- server =
- rpc.getServer(AggregatorNodemanagerProtocol.class, this,
- aggregatorServerAddress, serverConf,
- this.context.getNMTokenSecretManager(),
- conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT));
-
- server.start();
- // start remaining services
- super.serviceStart();
- LOG.info("NMAggregatorService started at " + aggregatorServerAddress);
- }
-
-
- @Override
- public void serviceStop() throws Exception {
- if (server != null) {
- server.stop();
- }
- // TODO may cleanup app aggregators running on this NM in future.
- super.serviceStop();
- }
-
- @Override
- public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
- ReportNewAggregatorsInfoRequest request) throws IOException {
- List<AppAggregatorsMap> newAggregatorsList = request.getAppAggregatorsList();
- if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) {
- Map<ApplicationId, String> newAggregatorsMap =
- new HashMap<ApplicationId, String>();
- for (AppAggregatorsMap aggregator : newAggregatorsList) {
- newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr());
- }
- ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap);
- }
-
- return ReportNewAggregatorsInfoResponse.newInstance();
- }
-
-}