You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/03/19 19:33:29 UTC
[2/4] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to
TimelineCollector. Contributed by Sangjin Lee
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
new file mode 100644
index 0000000..009fa63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+
+public class NMCollectorService extends CompositeService implements
+ CollectorNodemanagerProtocol {
+
+ private static final Log LOG = LogFactory.getLog(NMCollectorService.class);
+
+ final Context context;
+
+ private Server server;
+
+ public NMCollectorService(Context context) {
+
+ super(NMCollectorService.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+
+ InetSocketAddress collectorServerAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+
+ Configuration serverConf = new Configuration(conf);
+
+ // TODO Security settings.
+ YarnRPC rpc = YarnRPC.create(conf);
+
+ server =
+ rpc.getServer(CollectorNodemanagerProtocol.class, this,
+ collectorServerAddress, serverConf,
+ this.context.getNMTokenSecretManager(),
+ conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT));
+
+ server.start();
+ // start remaining services
+ super.serviceStart();
+ LOG.info("NMCollectorService started at " + collectorServerAddress);
+ }
+
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ // TODO may cleanup app collectors running on this NM in future.
+ super.serviceStop();
+ }
+
+ @Override
+ public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+ ReportNewCollectorInfoRequest request) throws IOException {
+ List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
+ if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
+ Map<ApplicationId, String> newCollectorsMap =
+ new HashMap<ApplicationId, String>();
+ for (AppCollectorsMap collector : newCollectorsList) {
+ newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr());
+ }
+ ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
+ }
+
+ return ReportNewCollectorInfoResponse.newInstance();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6bf3bbf..5f84b4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -425,10 +425,11 @@ public class ApplicationImpl implements Application {
new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId());
- // Remove aggregator info for finished apps.
- // TODO check we remove related aggregators info in failure cases (YARN-3038)
- app.context.getRegisteredAggregators().remove(app.getAppId());
- app.context.getKnownAggregators().remove(app.getAppId());
+ // Remove collectors info for finished apps.
+ // TODO check we remove related collectors info in failure cases
+ // (YARN-3038)
+ app.context.getRegisteredCollectors().remove(app.getAppId());
+ app.context.getKnownCollectors().remove(app.getAppId());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 2eb1a7f..b1f0472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -344,8 +344,8 @@ public class ApplicationMasterService extends AbstractService implements
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
- // Remove aggregator address when app get finished.
- rmApp.removeAggregatorAddr();
+ // Remove collector address when app get finished.
+ rmApp.removeCollectorAddr();
// checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart.
@@ -578,10 +578,10 @@ public class ApplicationMasterService extends AbstractService implements
allocateResponse.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-
- // add aggregator address for this application
- allocateResponse.setAggregatorAddr(
- this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
+
+ // add collector address for this application
+ allocateResponse.setCollectorAddr(
+ this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
// add preemption to the allocateResponse message (if any)
allocateResponse
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f163a28..16aae40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -410,11 +410,11 @@ public class ResourceTrackerService extends AbstractService implements
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return resync;
}
-
- // Check & update aggregators info from request.
+
+ // Check & update collectors info from request.
// TODO make sure it won't have race condition issue for AM failed over case
// that the older registration could possible override the newer one.
- updateAppAggregatorsMap(request);
+ updateAppCollectorsMap(request);
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -430,13 +430,14 @@ public class ResourceTrackerService extends AbstractService implements
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
-
- // Return aggregators' map that NM needs to know
- // TODO we should optimize this to only include aggreator info that NM
+
+ // Return collectors' map that NM needs to know
+ // TODO we should optimize this to only include collector info that NM
// doesn't know yet.
- List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+ List<ApplicationId> keepAliveApps =
+ remoteNodeStatus.getKeepAliveApplications();
if (keepAliveApps != null) {
- setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+ setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
}
// 4. Send status to RMNode, saving the latest response.
@@ -447,48 +448,49 @@ public class ResourceTrackerService extends AbstractService implements
return nodeHeartBeatResponse;
}
-
- private void setAppAggregatorsMapToResponse(
+
+ private void setAppCollectorsMapToResponse(
List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
- Map<ApplicationId, String> liveAppAggregatorsMap = new
+ Map<ApplicationId, String> liveAppCollectorsMap = new
ConcurrentHashMap<ApplicationId, String>();
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
for (ApplicationId appId : liveApps) {
- String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
- if (appAggregatorAddr != null) {
- liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+ String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
+ if (appCollectorAddr != null) {
+ liveAppCollectorsMap.put(appId, appCollectorAddr);
} else {
- // Log a debug info if aggregator address is not found.
+ // Log a debug info if collector address is not found.
if (LOG.isDebugEnabled()) {
- LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+ LOG.debug("Collector for applicaton: " + appId +
+ " hasn't registered yet!");
}
}
}
- response.setAppAggregatorsMap(liveAppAggregatorsMap);
+ response.setAppCollectorsMap(liveAppCollectorsMap);
}
-
- private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
- Map<ApplicationId, String> registeredAggregatorsMap =
- request.getRegisteredAggregators();
- if (registeredAggregatorsMap != null
- && !registeredAggregatorsMap.isEmpty()) {
+
+ private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
+ Map<ApplicationId, String> registeredCollectorsMap =
+ request.getRegisteredCollectors();
+ if (registeredCollectorsMap != null
+ && !registeredCollectorsMap.isEmpty()) {
Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
- for (Map.Entry<ApplicationId, String> entry:
- registeredAggregatorsMap.entrySet()) {
+ for (Map.Entry<ApplicationId, String> entry:
+ registeredCollectorsMap.entrySet()) {
ApplicationId appId = entry.getKey();
- String aggregatorAddr = entry.getValue();
- if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+ String collectorAddr = entry.getValue();
+ if (collectorAddr != null && !collectorAddr.isEmpty()) {
RMApp rmApp = rmApps.get(appId);
if (rmApp == null) {
- LOG.warn("Cannot update aggregator info because application ID: " +
+ LOG.warn("Cannot update collector info because application ID: " +
appId + " is not found in RMContext!");
} else {
- String previousAggregatorAddr = rmApp.getAggregatorAddr();
- if (previousAggregatorAddr == null ||
- previousAggregatorAddr != aggregatorAddr) {
- // sending aggregator update event.
- RMAppAggregatorUpdateEvent event =
- new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+ String previousCollectorAddr = rmApp.getCollectorAddr();
+ if (previousCollectorAddr == null ||
+ previousCollectorAddr != collectorAddr) {
+ // sending collector update event.
+ RMAppCollectorUpdateEvent event =
+ new RMAppCollectorUpdateEvent(appId, collectorAddr);
rmContext.getDispatcher().getEventHandler().handle(event);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index f81edb2..32cf2cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -172,23 +172,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the tracking url for the application master.
*/
String getTrackingUrl();
-
+
/**
- * The aggregator address for the application.
- * @return the address for the application's aggregator.
+ * The collector address for the application.
+ * @return the address for the application's collector.
*/
- String getAggregatorAddr();
-
+ String getCollectorAddr();
+
/**
- * Set aggregator address for the application
- * @param aggregatorAddr the address of aggregator
+ * Set collector address for the application
+ * @param collectorAddr the address of collector
*/
- void setAggregatorAddr(String aggregatorAddr);
-
+ void setCollectorAddr(String collectorAddr);
+
/**
- * Remove aggregator address when application is finished or killed.
+ * Remove collector address when application is finished or killed.
*/
- void removeAggregatorAddr();
+ void removeCollectorAddr();
/**
* The original tracking url for the application master.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
deleted file mode 100644
index b43de44..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-public class RMAppAggregatorUpdateEvent extends RMAppEvent {
-
- private final String appAggregatorAddr;
-
- public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
- super(appId, RMAppEventType.AGGREGATOR_UPDATE);
- this.appAggregatorAddr = appAggregatorAddr;
- }
-
- public String getAppAggregatorAddr(){
- return this.appAggregatorAddr;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
new file mode 100644
index 0000000..698c9b5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppCollectorUpdateEvent extends RMAppEvent {
+
+ private final String appCollectorAddr;
+
+ public RMAppCollectorUpdateEvent(ApplicationId appId,
+ String appCollectorAddr) {
+ super(appId, RMAppEventType.COLLECTOR_UPDATE);
+ this.appCollectorAddr = appCollectorAddr;
+ }
+
+ public String getAppCollectorAddr(){
+ return this.appCollectorAddr;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 6e9460a..2b42638 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -30,9 +30,9 @@ public enum RMAppEventType {
// Source: Scheduler
APP_ACCEPTED,
-
+
// TODO add source later
- AGGREGATOR_UPDATE,
+ COLLECTOR_UPDATE,
// Source: RMAppAttempt
ATTEMPT_REGISTERED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 6a076ac..61c5748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -134,7 +134,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
- private String aggregatorAddr;
+ private String collectorAddr;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@@ -167,7 +167,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -185,7 +185,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -205,7 +205,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@@ -223,7 +223,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
@@ -251,7 +251,7 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@@ -282,7 +282,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -295,7 +295,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -308,7 +308,7 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
- RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -505,20 +505,20 @@ public class RMAppImpl implements RMApp, Recoverable {
public void setQueue(String queue) {
this.queue = queue;
}
-
+
@Override
- public String getAggregatorAddr() {
- return this.aggregatorAddr;
+ public String getCollectorAddr() {
+ return this.collectorAddr;
}
-
+
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
- this.aggregatorAddr = aggregatorAddr;
+ public void setCollectorAddr(String collectorAddr) {
+ this.collectorAddr = collectorAddr;
}
-
+
@Override
- public void removeAggregatorAddr() {
- this.aggregatorAddr = null;
+ public void removeCollectorAddr() {
+ this.collectorAddr = null;
}
@Override
@@ -769,8 +769,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
- //TODO recover aggregator address.
- //this.aggregatorAddr = appState.getAggregatorAddr();
+ //TODO recover collector address.
+ //this.collectorAddr = appState.getCollectorAddr();
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
@@ -814,22 +814,22 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
- private static final class RMAppAggregatorUpdateTransition
+ private static final class RMAppCollectorUpdateTransition
extends RMAppTransition {
-
+
public void transition(RMAppImpl app, RMAppEvent event) {
- LOG.info("Updating aggregator info for app: " + app.getApplicationId());
-
- RMAppAggregatorUpdateEvent appAggregatorUpdateEvent =
- (RMAppAggregatorUpdateEvent) event;
- // Update aggregator address
- app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
-
+ LOG.info("Updating collector info for app: " + app.getApplicationId());
+
+ RMAppCollectorUpdateEvent appCollectorUpdateEvent =
+ (RMAppCollectorUpdateEvent) event;
+ // Update collector address
+ app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
+
// TODO persistent to RMStateStore for recover
// Save to RMStateStore
};
}
-
+
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 0d0895a..2e1a27e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -92,15 +92,15 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public String getAggregatorAddr() {
+ public String getCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void removeAggregatorAddr() {
+ public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 96952d2..58dcacf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -273,17 +273,17 @@ public class MockRMApp implements RMApp {
}
@Override
- public String getAggregatorAddr() {
+ public String getCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
-
+
@Override
- public void removeAggregatorAddr() {
+ public void removeCollectorAddr() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public void setAggregatorAddr(String aggregatorAddr) {
+ public void setCollectorAddr(String collectorAddr) {
throw new UnsupportedOperationException("Not supported yet.");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index ae5efa5..1fef76e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -56,6 +56,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 32ee5d8..fab131c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -1,25 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.yarn.server.timelineservice;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.junit.Assert.fail;
-
public class TestTimelineServiceClientIntegration {
- private static PerNodeTimelineAggregatorsAuxService auxService;
+ private static TimelineCollectorManager collectorManager;
+ private static PerNodeTimelineCollectorsAuxService auxService;
@BeforeClass
public static void setupClass() throws Exception {
try {
- auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+ collectorManager = new MyTimelineCollectorManager();
+ auxService =
+ PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
+ collectorManager);
auxService.addApplication(ApplicationId.newInstance(0, 1));
} catch (ExitUtil.ExitException e) {
fail();
@@ -38,6 +63,9 @@ public class TestTimelineServiceClientIntegration {
TimelineClient client =
TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
try {
+ // set the timeline service address manually
+ client.setTimelineServiceAddress(
+ collectorManager.getRestServerBindAddress());
client.init(new YarnConfiguration());
client.start();
TimelineEntity entity = new TimelineEntity();
@@ -45,10 +73,20 @@ public class TestTimelineServiceClientIntegration {
entity.setId("test entity id");
client.putEntities(entity);
client.putEntitiesAsync(entity);
- } catch(Exception e) {
- fail();
} finally {
client.stop();
}
}
+
+ private static class MyTimelineCollectorManager extends
+ TimelineCollectorManager {
+ public MyTimelineCollectorManager() {
+ super();
+ }
+
+ @Override
+ protected CollectorNodemanagerProtocol getNMCollectorService() {
+ return mock(CollectorNodemanagerProtocol.class);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 26790f1..f974aee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -58,6 +58,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
</dependency>
@@ -72,6 +77,11 @@
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
deleted file mode 100644
index 95ec9f8..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage for a given YARN application.
- *
- * App-related lifecycle management is handled by this service.
- */
-@Private
-@Unstable
-public class AppLevelTimelineAggregator extends TimelineAggregator {
- private final String applicationId;
- // TODO define key metadata such as flow metadata, user, and queue
-
- public AppLevelTimelineAggregator(String applicationId) {
- super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
- this.applicationId = applicationId;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
deleted file mode 100644
index 19920fd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.nio.ByteBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ContainerContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * The top-level server for the per-node timeline aggregator collection. Currently
- * it is defined as an auxiliary service to accommodate running within another
- * daemon (e.g. node manager).
- */
-@Private
-@Unstable
-public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
- private static final Log LOG =
- LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
- private static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
- private final TimelineAggregatorsCollection aggregatorCollection;
-
- public PerNodeTimelineAggregatorsAuxService() {
- // use the same singleton
- this(TimelineAggregatorsCollection.getInstance());
- }
-
- @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
- TimelineAggregatorsCollection aggregatorCollection) {
- super("timeline_aggregator");
- this.aggregatorCollection = aggregatorCollection;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- aggregatorCollection.init(conf);
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- aggregatorCollection.start();
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- aggregatorCollection.stop();
- super.serviceStop();
- }
-
- // these methods can be used as the basis for future service methods if the
- // per-node aggregator runs separate from the node manager
- /**
- * Creates and adds an app level aggregator for the specified application id.
- * The aggregator is also initialized and started. If the service already
- * exists, no new service is created.
- *
- * @return whether it was added successfully
- */
- public boolean addApplication(ApplicationId appId) {
- AppLevelTimelineAggregator aggregator =
- new AppLevelTimelineAggregator(appId.toString());
- return (aggregatorCollection.putIfAbsent(appId, aggregator)
- == aggregator);
- }
-
- /**
- * Removes the app level aggregator for the specified application id. The
- * aggregator is also stopped as a result. If the aggregator does not exist, no
- * change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean removeApplication(ApplicationId appId) {
- String appIdString = appId.toString();
- return aggregatorCollection.remove(appIdString);
- }
-
- /**
- * Creates and adds an app level aggregator for the specified application id.
- * The aggregator is also initialized and started. If the aggregator already
- * exists, no new aggregator is created.
- */
- @Override
- public void initializeContainer(ContainerInitializationContext context) {
- // intercept the event of the AM container being created and initialize the
- // app level aggregator service
- if (isApplicationMaster(context)) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- addApplication(appId);
- }
- }
-
- /**
- * Removes the app level aggregator for the specified application id. The
- * aggregator is also stopped as a result. If the aggregator does not exist, no
- * change is made.
- */
- @Override
- public void stopContainer(ContainerTerminationContext context) {
- // intercept the event of the AM container being stopped and remove the app
- // level aggregator service
- if (isApplicationMaster(context)) {
- ApplicationId appId = context.getContainerId().
- getApplicationAttemptId().getApplicationId();
- removeApplication(appId);
- }
- }
-
- private boolean isApplicationMaster(ContainerContext context) {
- // TODO this is based on a (shaky) assumption that the container id (the
- // last field of the full container id) for an AM is always 1
- // we want to make this much more reliable
- ContainerId containerId = context.getContainerId();
- return containerId.getContainerId() == 1L;
- }
-
- @VisibleForTesting
- boolean hasApplication(String appId) {
- return aggregatorCollection.containsKey(appId);
- }
-
- @Override
- public void initializeApplication(ApplicationInitializationContext context) {
- }
-
- @Override
- public void stopApplication(ApplicationTerminationContext context) {
- }
-
- @Override
- public ByteBuffer getMetaData() {
- // TODO currently it is not used; we can return a more meaningful data when
- // we connect it with an AM
- return ByteBuffer.allocate(0);
- }
-
- @VisibleForTesting
- public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) {
- Thread
- .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args,
- LOG);
- PerNodeTimelineAggregatorsAuxService auxService = null;
- try {
- auxService = new PerNodeTimelineAggregatorsAuxService();
- ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
- SHUTDOWN_HOOK_PRIORITY);
- YarnConfiguration conf = new YarnConfiguration();
- auxService.init(conf);
- auxService.start();
- } catch (Throwable t) {
- LOG.fatal("Error starting PerNodeAggregatorServer", t);
- ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
- }
- return auxService;
- }
-
- private static class ShutdownHook implements Runnable {
- private final PerNodeTimelineAggregatorsAuxService auxService;
-
- public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
- this.auxService = auxService;
- }
-
- public void run() {
- auxService.stop();
- }
- }
-
- public static void main(String[] args) {
- launchServer(args);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
deleted file mode 100644
index dbd0895..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
-import org.apache.hadoop.util.ReflectionUtils;
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage.
- *
- * Classes that extend this can add their own lifecycle management or
- * customization of request handling.
- */
-@Private
-@Unstable
-public abstract class TimelineAggregator extends CompositeService {
- private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
-
- private TimelineWriter writer;
-
- public TimelineAggregator(String name) {
- super(name);
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- writer = ReflectionUtils.newInstance(conf.getClass(
- YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
- FileSystemTimelineWriterImpl.class,
- TimelineWriter.class), conf);
- writer.init(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- writer.stop();
- }
-
- public TimelineWriter getWriter() {
- return writer;
- }
-
- /**
- * Handles entity writes. These writes are synchronous and are written to the
- * backing storage without buffering/batching. If any entity already exists,
- * it results in an update of the entity.
- *
- * This method should be reserved for selected critical entities and events.
- * For normal voluminous writes one should use the async method
- * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
- *
- * @param entities entities to post
- * @param callerUgi the caller UGI
- * @return the response that contains the result of the post.
- */
- public TimelineWriteResponse postEntities(TimelineEntities entities,
- UserGroupInformation callerUgi) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
- LOG.debug("postEntities(entities=" + entities + ", callerUgi="
- + callerUgi + ")");
- }
-
- return writer.write(entities);
- }
-
- /**
- * Handles entity writes in an asynchronous manner. The method returns as soon
- * as validation is done. No promises are made on how quickly it will be
- * written to the backing storage or if it will always be written to the
- * backing storage. Multiple writes to the same entities may be batched and
- * appropriate values updated and result in fewer writes to the backing
- * storage.
- *
- * @param entities entities to post
- * @param callerUgi the caller UGI
- */
- public void postEntitiesAsync(TimelineEntities entities,
- UserGroupInformation callerUgi) {
- // TODO implement
- if (LOG.isDebugEnabled()) {
- LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
- callerUgi + ")");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
deleted file mode 100644
index 7d42f94..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.webapp.ForbiddenException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-
-import com.google.inject.Singleton;
-
-/**
- * The main per-node REST end point for timeline service writes. It is
- * essentially a container service that routes requests to the appropriate
- * per-app services.
- */
-@Private
-@Unstable
-@Singleton
-@Path("/ws/v2/timeline")
-public class TimelineAggregatorWebService {
- private static final Log LOG =
- LogFactory.getLog(TimelineAggregatorWebService.class);
-
- private @Context ServletContext context;
-
- @XmlRootElement(name = "about")
- @XmlAccessorType(XmlAccessType.NONE)
- @Public
- @Unstable
- public static class AboutInfo {
-
- private String about;
-
- public AboutInfo() {
-
- }
-
- public AboutInfo(String about) {
- this.about = about;
- }
-
- @XmlElement(name = "About")
- public String getAbout() {
- return about;
- }
-
- public void setAbout(String about) {
- this.about = about;
- }
-
- }
-
- /**
- * Return the description of the timeline web services.
- */
- @GET
- @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public AboutInfo about(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res) {
- init(res);
- return new AboutInfo("Timeline API");
- }
-
- /**
- * Accepts writes to the aggregator, and returns a response. It simply routes
- * the request to the app level aggregator. It expects an application as a
- * context.
- */
- @PUT
- @Path("/entities")
- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public Response putEntities(
- @Context HttpServletRequest req,
- @Context HttpServletResponse res,
- @QueryParam("async") String async,
- @QueryParam("appid") String appId,
- TimelineEntities entities) {
- init(res);
- UserGroupInformation callerUgi = getUser(req);
- if (callerUgi == null) {
- String msg = "The owner of the posted timeline entities is not set";
- LOG.error(msg);
- throw new ForbiddenException(msg);
- }
-
- // TODO how to express async posts and handle them
- boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
- try {
- appId = parseApplicationId(appId);
- if (appId == null) {
- return Response.status(Response.Status.BAD_REQUEST).build();
- }
- TimelineAggregator service = getAggregatorService(req, appId);
- if (service == null) {
- LOG.error("Application not found");
- throw new NotFoundException(); // different exception?
- }
- service.postEntities(entities, callerUgi);
- return Response.ok().build();
- } catch (Exception e) {
- LOG.error("Error putting entities", e);
- throw new WebApplicationException(e,
- Response.Status.INTERNAL_SERVER_ERROR);
- }
- }
-
- private String parseApplicationId(String appId) {
- // Make sure the appId is not null and is valid
- ApplicationId appID;
- try {
- if (appId != null) {
- return ConverterUtils.toApplicationId(appId.trim()).toString();
- } else {
- return null;
- }
- } catch (Exception e) {
- return null;
- }
- }
-
- private TimelineAggregator
- getAggregatorService(HttpServletRequest req, String appIdToParse) {
- String appIdString = parseApplicationId(appIdToParse);
- final TimelineAggregatorsCollection aggregatorCollection =
- (TimelineAggregatorsCollection) context.getAttribute(
- TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
- return aggregatorCollection.get(appIdString);
- }
-
- private void init(HttpServletResponse response) {
- response.setContentType(null);
- }
-
- private UserGroupInformation getUser(HttpServletRequest req) {
- String remoteUser = req.getRemoteUser();
- UserGroupInformation callerUgi = null;
- if (remoteUser != null) {
- callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
- }
- return callerUgi;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
deleted file mode 100644
index d6e2a18..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-/**
- * Class that manages adding and removing aggregators and their lifecycle. It
- * provides thread safety access to the aggregators inside.
- *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
- */
-@Private
-@Unstable
-public class TimelineAggregatorsCollection extends CompositeService {
- private static final Log LOG =
- LogFactory.getLog(TimelineAggregatorsCollection.class);
- private static final TimelineAggregatorsCollection INSTANCE =
- new TimelineAggregatorsCollection();
-
- // access to this map is synchronized with the map itself
- private final Map<String, TimelineAggregator> aggregators =
- Collections.synchronizedMap(
- new HashMap<String, TimelineAggregator>());
-
- // REST server for this aggregator collection
- private HttpServer2 timelineRestServer;
-
- private String timelineRestServerBindAddress;
-
- private AggregatorNodemanagerProtocol nmAggregatorService;
-
- private InetSocketAddress nmAggregatorServiceAddress;
-
- static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
-
- static TimelineAggregatorsCollection getInstance() {
- return INSTANCE;
- }
-
- TimelineAggregatorsCollection() {
- super(TimelineAggregatorsCollection.class.getName());
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- this.nmAggregatorServiceAddress = conf.getSocketAddr(
- YarnConfiguration.NM_BIND_HOST,
- YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
- YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
-
- }
-
- @Override
- protected void serviceStart() throws Exception {
- startWebApp();
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- if (timelineRestServer != null) {
- timelineRestServer.stop();
- }
- super.serviceStop();
- }
-
- /**
- * Put the aggregator into the collection if an aggregator mapped by id does
- * not exist.
- *
- * @throws YarnRuntimeException if there was any exception in initializing and
- * starting the app level service
- * @return the aggregator associated with id after the potential put.
- */
- public TimelineAggregator putIfAbsent(ApplicationId appId,
- TimelineAggregator aggregator) {
- String id = appId.toString();
- TimelineAggregator aggregatorInTable;
- boolean aggregatorIsNew = false;
- synchronized (aggregators) {
- aggregatorInTable = aggregators.get(id);
- if (aggregatorInTable == null) {
- try {
- // initialize, start, and add it to the collection so it can be
- // cleaned up when the parent shuts down
- aggregator.init(getConfig());
- aggregator.start();
- aggregators.put(id, aggregator);
- LOG.info("the aggregator for " + id + " was added");
- aggregatorInTable = aggregator;
- aggregatorIsNew = true;
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
- }
- } else {
- String msg = "the aggregator for " + id + " already exists!";
- LOG.error(msg);
- }
-
- }
- // Report to NM if a new aggregator is added.
- if (aggregatorIsNew) {
- try {
- reportNewAggregatorToNM(appId);
- } catch (Exception e) {
- // throw exception here as it cannot be used if failed report to NM
- LOG.error("Failed to report a new aggregator for application: " + appId +
- " to NM Aggregator Services.");
- throw new YarnRuntimeException(e);
- }
- }
-
- return aggregatorInTable;
- }
-
- /**
- * Removes the aggregator for the specified id. The aggregator is also stopped
- * as a result. If the aggregator does not exist, no change is made.
- *
- * @return whether it was removed successfully
- */
- public boolean remove(String id) {
- synchronized (aggregators) {
- TimelineAggregator aggregator = aggregators.remove(id);
- if (aggregator == null) {
- String msg = "the aggregator for " + id + " does not exist!";
- LOG.error(msg);
- return false;
- } else {
- // stop the service to do clean up
- aggregator.stop();
- LOG.info("the aggregator service for " + id + " was removed");
- return true;
- }
- }
- }
-
- /**
- * Returns the aggregator for the specified id.
- *
- * @return the aggregator or null if it does not exist
- */
- public TimelineAggregator get(String id) {
- return aggregators.get(id);
- }
-
- /**
- * Returns whether the aggregator for the specified id exists in this
- * collection.
- */
- public boolean containsKey(String id) {
- return aggregators.containsKey(id);
- }
-
- /**
- * Launch the REST web server for this aggregator collection
- */
- private void startWebApp() {
- Configuration conf = getConfig();
- // use the same ports as the old ATS for now; we could create new properties
- // for the new timeline service if needed
- String bindAddress = WebAppUtils.getWebAppBindURL(conf,
- YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
- WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
- this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
- NetUtils.createSocketAddr(bindAddress));
- LOG.info("Instantiating the per-node aggregator webapp at " +
- timelineRestServerBindAddress);
- try {
- Configuration confForInfoServer = new Configuration(conf);
- confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
- HttpServer2.Builder builder = new HttpServer2.Builder()
- .setName("timeline")
- .setConf(conf)
- .addEndpoint(URI.create("http://" + bindAddress));
- timelineRestServer = builder.build();
- // TODO: replace this by an authentication filter in future.
- HashMap<String, String> options = new HashMap<>();
- String username = conf.get(HADOOP_HTTP_STATIC_USER,
- DEFAULT_HADOOP_HTTP_STATIC_USER);
- options.put(HADOOP_HTTP_STATIC_USER, username);
- HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
- "static_user_filter_timeline",
- StaticUserWebFilter.StaticUserFilter.class.getName(),
- options, new String[] {"/*"});
-
- timelineRestServer.addJerseyResourcePackage(
- TimelineAggregatorWebService.class.getPackage().getName() + ";"
- + GenericExceptionHandler.class.getPackage().getName() + ";"
- + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
- "/*");
- timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
- TimelineAggregatorsCollection.getInstance());
- timelineRestServer.start();
- } catch (Exception e) {
- String msg = "The per-node aggregator webapp failed to start.";
- LOG.error(msg, e);
- throw new YarnRuntimeException(msg, e);
- }
- }
-
- private void reportNewAggregatorToNM(ApplicationId appId)
- throws YarnException, IOException {
- this.nmAggregatorService = getNMAggregatorService();
- ReportNewAggregatorsInfoRequest request =
- ReportNewAggregatorsInfoRequest.newInstance(appId,
- this.timelineRestServerBindAddress);
- LOG.info("Report a new aggregator for application: " + appId +
- " to NM Aggregator Services.");
- nmAggregatorService.reportNewAggregatorInfo(request);
- }
-
- // protected for test
- protected AggregatorNodemanagerProtocol getNMAggregatorService(){
- Configuration conf = getConfig();
- final YarnRPC rpc = YarnRPC.create(conf);
-
- // TODO Security settings.
- return (AggregatorNodemanagerProtocol) rpc.getProxy(
- AggregatorNodemanagerProtocol.class,
- nmAggregatorServiceAddress, conf);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
new file mode 100644
index 0000000..7d59876
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineCollector extends TimelineCollector {
+ private final String applicationId;
+ // TODO define key metadata such as flow metadata, user, and queue
+
+ public AppLevelTimelineCollector(String applicationId) {
+ super(AppLevelTimelineCollector.class.getName() + " - " + applicationId);
+ this.applicationId = applicationId;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+}