You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/03/19 19:33:29 UTC

[2/4] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. Contributed by Sangjin Lee

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
new file mode 100644
index 0000000..009fa63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+
+public class NMCollectorService extends CompositeService implements
+    CollectorNodemanagerProtocol {
+
+  private static final Log LOG = LogFactory.getLog(NMCollectorService.class);
+
+  final Context context;
+
+  private Server server;
+
+  public NMCollectorService(Context context) {
+
+    super(NMCollectorService.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+
+    InetSocketAddress collectorServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+
+    // TODO Security settings.
+    YarnRPC rpc = YarnRPC.create(conf);
+
+    server =
+        rpc.getServer(CollectorNodemanagerProtocol.class, this,
+            collectorServerAddress, serverConf,
+            this.context.getNMTokenSecretManager(),
+            conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT,
+                YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT));
+
+    server.start();
+    // start remaining services
+    super.serviceStart();
+    LOG.info("NMCollectorService started at " + collectorServerAddress);
+  }
+
+
+  @Override
+  public void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+    // TODO may cleanup app collectors running on this NM in future.
+    super.serviceStop();
+  }
+
+  @Override
+  public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+      ReportNewCollectorInfoRequest request) throws IOException {
+    List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
+    if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
+      Map<ApplicationId, String> newCollectorsMap =
+          new HashMap<ApplicationId, String>();
+      for (AppCollectorsMap collector : newCollectorsList) {
+        newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr());
+      }
+      ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
+    }
+
+    return ReportNewCollectorInfoResponse.newInstance();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 6bf3bbf..5f84b4f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -425,10 +425,11 @@ public class ApplicationImpl implements Application {
           new LogHandlerAppFinishedEvent(app.appId));
 
       app.context.getNMTokenSecretManager().appFinished(app.getAppId());
-      // Remove aggregator info for finished apps.
-      // TODO check we remove related aggregators info in failure cases (YARN-3038)
-      app.context.getRegisteredAggregators().remove(app.getAppId());
-      app.context.getKnownAggregators().remove(app.getAppId());
+      // Remove collectors info for finished apps.
+      // TODO check we remove related collectors info in failure cases
+      // (YARN-3038)
+      app.context.getRegisteredCollectors().remove(app.getAppId());
+      app.context.getKnownCollectors().remove(app.getAppId());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 2eb1a7f..b1f0472 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -344,8 +344,8 @@ public class ApplicationMasterService extends AbstractService implements
 
     RMApp rmApp =
         rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
-    // Remove aggregator address when app get finished.
-    rmApp.removeAggregatorAddr();
+    // Remove collector address when app get finished.
+    rmApp.removeCollectorAddr();
     // checking whether the app exits in RMStateStore at first not to throw
     // ApplicationDoesNotExistInCacheException before and after
     // RM work-preserving restart.
@@ -578,10 +578,10 @@ public class ApplicationMasterService extends AbstractService implements
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
 
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-      
-      // add aggregator address for this application
-      allocateResponse.setAggregatorAddr(
-          this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
+
+      // add collector address for this application
+      allocateResponse.setCollectorAddr(
+          this.rmContext.getRMApps().get(applicationId).getCollectorAddr());
 
       // add preemption to the allocateResponse message (if any)
       allocateResponse

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f163a28..16aae40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -410,11 +410,11 @@ public class ResourceTrackerService extends AbstractService implements
           new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
       return resync;
     }
-    
-    // Check & update aggregators info from request.
+
+    // Check & update collectors info from request.
     // TODO make sure it won't have race condition issue for AM failed over case
     // that the older registration could possible override the newer one.
-    updateAppAggregatorsMap(request);
+    updateAppCollectorsMap(request);
 
     // Heartbeat response
     NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -430,13 +430,14 @@ public class ResourceTrackerService extends AbstractService implements
     if (!systemCredentials.isEmpty()) {
       nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
     }
-    
-    // Return aggregators' map that NM needs to know
-    // TODO we should optimize this to only include aggreator info that NM 
+
+    // Return collectors' map that NM needs to know
+    // TODO we should optimize this to only include collector info that NM
     // doesn't know yet.
-    List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+    List<ApplicationId> keepAliveApps =
+        remoteNodeStatus.getKeepAliveApplications();
     if (keepAliveApps != null) {
-      setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+      setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
     }
 
     // 4. Send status to RMNode, saving the latest response.
@@ -447,48 +448,49 @@ public class ResourceTrackerService extends AbstractService implements
 
     return nodeHeartBeatResponse;
   }
-  
-  private void setAppAggregatorsMapToResponse(
+
+  private void setAppCollectorsMapToResponse(
       List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
-    Map<ApplicationId, String> liveAppAggregatorsMap = new 
+    Map<ApplicationId, String> liveAppCollectorsMap = new
         ConcurrentHashMap<ApplicationId, String>();
     Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
       for (ApplicationId appId : liveApps) {
-        String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
-        if (appAggregatorAddr != null) {
-          liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+        String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
+        if (appCollectorAddr != null) {
+          liveAppCollectorsMap.put(appId, appCollectorAddr);
         } else {
-          // Log a debug info if aggregator address is not found.
+          // Log a debug info if collector address is not found.
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+            LOG.debug("Collector for applicaton: " + appId +
+                " hasn't registered yet!");
           }
         }
       }
-    response.setAppAggregatorsMap(liveAppAggregatorsMap);
+    response.setAppCollectorsMap(liveAppCollectorsMap);
   }
-  
-  private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
-    Map<ApplicationId, String> registeredAggregatorsMap = 
-        request.getRegisteredAggregators();
-    if (registeredAggregatorsMap != null 
-        && !registeredAggregatorsMap.isEmpty()) {
+
+  private void updateAppCollectorsMap(NodeHeartbeatRequest request) {
+    Map<ApplicationId, String> registeredCollectorsMap =
+        request.getRegisteredCollectors();
+    if (registeredCollectorsMap != null
+        && !registeredCollectorsMap.isEmpty()) {
       Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
-      for (Map.Entry<ApplicationId, String> entry: 
-          registeredAggregatorsMap.entrySet()) {
+      for (Map.Entry<ApplicationId, String> entry:
+          registeredCollectorsMap.entrySet()) {
         ApplicationId appId = entry.getKey();
-        String aggregatorAddr = entry.getValue();
-        if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+        String collectorAddr = entry.getValue();
+        if (collectorAddr != null && !collectorAddr.isEmpty()) {
           RMApp rmApp = rmApps.get(appId);
           if (rmApp == null) {
-            LOG.warn("Cannot update aggregator info because application ID: " + 
+            LOG.warn("Cannot update collector info because application ID: " +
                 appId + " is not found in RMContext!");
           } else {
-            String previousAggregatorAddr = rmApp.getAggregatorAddr();
-            if (previousAggregatorAddr == null || 
-                previousAggregatorAddr != aggregatorAddr) {
-              // sending aggregator update event.
-              RMAppAggregatorUpdateEvent event =
-                  new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+            String previousCollectorAddr = rmApp.getCollectorAddr();
+            if (previousCollectorAddr == null ||
+                previousCollectorAddr != collectorAddr) {
+              // sending collector update event.
+              RMAppCollectorUpdateEvent event =
+                  new RMAppCollectorUpdateEvent(appId, collectorAddr);
               rmContext.getDispatcher().getEventHandler().handle(event);
             }
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index f81edb2..32cf2cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -172,23 +172,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return the tracking url for the application master.
    */
   String getTrackingUrl();
-  
+
   /**
-   * The aggregator address for the application.
-   * @return the address for the application's aggregator.
+   * The collector address for the application.
+   * @return the address for the application's collector.
    */
-  String getAggregatorAddr();
-  
+  String getCollectorAddr();
+
   /**
-   * Set aggregator address for the application
-   * @param aggregatorAddr the address of aggregator
+   * Set collector address for the application
+   * @param collectorAddr the address of collector
    */
-  void setAggregatorAddr(String aggregatorAddr);
-  
+  void setCollectorAddr(String collectorAddr);
+
   /**
-   * Remove aggregator address when application is finished or killed.
+   * Remove collector address when application is finished or killed.
    */
-  void removeAggregatorAddr();
+  void removeCollectorAddr();
 
   /**
    * The original tracking url for the application master.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
deleted file mode 100644
index b43de44..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-public class RMAppAggregatorUpdateEvent extends RMAppEvent {
-
-  private final String appAggregatorAddr;
-  
-  public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
-    super(appId, RMAppEventType.AGGREGATOR_UPDATE);
-    this.appAggregatorAddr = appAggregatorAddr;
-  }
-  
-  public String getAppAggregatorAddr(){
-    return this.appAggregatorAddr;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
new file mode 100644
index 0000000..698c9b5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppCollectorUpdateEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppCollectorUpdateEvent extends RMAppEvent {
+
+  private final String appCollectorAddr;
+
+  public RMAppCollectorUpdateEvent(ApplicationId appId,
+      String appCollectorAddr) {
+    super(appId, RMAppEventType.COLLECTOR_UPDATE);
+    this.appCollectorAddr = appCollectorAddr;
+  }
+
+  public String getAppCollectorAddr(){
+    return this.appCollectorAddr;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 6e9460a..2b42638 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -30,9 +30,9 @@ public enum RMAppEventType {
 
   // Source: Scheduler
   APP_ACCEPTED,
-  
+
   // TODO add source later
-  AGGREGATOR_UPDATE,
+  COLLECTOR_UPDATE,
 
   // Source: RMAppAttempt
   ATTEMPT_REGISTERED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 6a076ac..61c5748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -134,7 +134,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long startTime;
   private long finishTime = 0;
   private long storedFinishTime = 0;
-  private String aggregatorAddr;
+  private String collectorAddr;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -167,7 +167,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.NEW, RMAppState.NEW,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -185,7 +185,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
         RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -205,7 +205,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.APP_REJECTED,
         new FinalSavingTransition(
@@ -223,7 +223,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
@@ -251,7 +251,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.MOVE, new RMAppMoveTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_UNREGISTERED,
         new FinalSavingTransition(
@@ -282,7 +282,7 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -295,7 +295,7 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -308,7 +308,7 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
     .addTransition(RMAppState.KILLING, RMAppState.KILLING,
-        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
+        RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -505,20 +505,20 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void setQueue(String queue) {
     this.queue = queue;
   }
-  
+
   @Override
-  public String getAggregatorAddr() {
-    return this.aggregatorAddr;
+  public String getCollectorAddr() {
+    return this.collectorAddr;
   }
-  
+
   @Override
-  public void setAggregatorAddr(String aggregatorAddr) {
-    this.aggregatorAddr = aggregatorAddr;
+  public void setCollectorAddr(String collectorAddr) {
+    this.collectorAddr = collectorAddr;
   }
-  
+
   @Override
-  public void removeAggregatorAddr() {
-    this.aggregatorAddr = null;
+  public void removeCollectorAddr() {
+    this.collectorAddr = null;
   }
 
   @Override
@@ -769,8 +769,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.diagnostics.append(appState.getDiagnostics());
     this.storedFinishTime = appState.getFinishTime();
     this.startTime = appState.getStartTime();
-    //TODO recover aggregator address.
-    //this.aggregatorAddr = appState.getAggregatorAddr();
+    //TODO recover collector address.
+    //this.collectorAddr = appState.getCollectorAddr();
 
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
@@ -814,22 +814,22 @@ public class RMAppImpl implements RMApp, Recoverable {
     };
   }
 
-  private static final class RMAppAggregatorUpdateTransition 
+  private static final class RMAppCollectorUpdateTransition
       extends RMAppTransition {
-  
+
     public void transition(RMAppImpl app, RMAppEvent event) {
-      LOG.info("Updating aggregator info for app: " + app.getApplicationId());
-    
-      RMAppAggregatorUpdateEvent appAggregatorUpdateEvent = 
-          (RMAppAggregatorUpdateEvent) event;
-      // Update aggregator address
-      app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
-      
+      LOG.info("Updating collector info for app: " + app.getApplicationId());
+
+      RMAppCollectorUpdateEvent appCollectorUpdateEvent =
+          (RMAppCollectorUpdateEvent) event;
+      // Update collector address
+      app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
+
       // TODO persistent to RMStateStore for recover
       // Save to RMStateStore
     };
   }
-  
+
   private static final class RMAppNodeUpdateTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 0d0895a..2e1a27e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -92,15 +92,15 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public String getAggregatorAddr() {
+    public String getCollectorAddr() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public void setAggregatorAddr(String aggregatorAddr) {
+    public void setCollectorAddr(String collectorAddr) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
-    public void removeAggregatorAddr() {
+    public void removeCollectorAddr() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 96952d2..58dcacf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -273,17 +273,17 @@ public class MockRMApp implements RMApp {
   }
 
   @Override
-  public String getAggregatorAddr() {
+  public String getCollectorAddr() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
-  
+
   @Override
-  public void removeAggregatorAddr() {
+  public void removeCollectorAddr() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
   @Override
-  public void setAggregatorAddr(String aggregatorAddr) {
+  public void setCollectorAddr(String collectorAddr) {
     throw new UnsupportedOperationException("Not supported yet.");
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index ae5efa5..1fef76e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -56,6 +56,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
index 32ee5d8..fab131c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -1,25 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.yarn.server.timelineservice;
 
 
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.junit.Assert.fail;
-
 public class TestTimelineServiceClientIntegration {
-  private static PerNodeTimelineAggregatorsAuxService auxService;
+  private static TimelineCollectorManager collectorManager;
+  private static PerNodeTimelineCollectorsAuxService auxService;
 
   @BeforeClass
   public static void setupClass() throws Exception {
     try {
-      auxService = PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
+      collectorManager = new MyTimelineCollectorManager();
+      auxService =
+          PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
+              collectorManager);
       auxService.addApplication(ApplicationId.newInstance(0, 1));
     } catch (ExitUtil.ExitException e) {
       fail();
@@ -38,6 +63,9 @@ public class TestTimelineServiceClientIntegration {
     TimelineClient client =
         TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
     try {
+      // set the timeline service address manually
+      client.setTimelineServiceAddress(
+          collectorManager.getRestServerBindAddress());
       client.init(new YarnConfiguration());
       client.start();
       TimelineEntity entity = new TimelineEntity();
@@ -45,10 +73,20 @@ public class TestTimelineServiceClientIntegration {
       entity.setId("test entity id");
       client.putEntities(entity);
       client.putEntitiesAsync(entity);
-    } catch(Exception e) {
-      fail();
     } finally {
       client.stop();
     }
   }
+
+  private static class MyTimelineCollectorManager extends
+      TimelineCollectorManager {
+    public MyTimelineCollectorManager() {
+      super();
+    }
+
+    @Override
+    protected CollectorNodemanagerProtocol getNMCollectorService() {
+      return mock(CollectorNodemanagerProtocol.class);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 26790f1..f974aee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -58,6 +58,11 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
     </dependency>
 
@@ -72,6 +77,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>javax.servlet</groupId>
       <artifactId>servlet-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
deleted file mode 100644
index 95ec9f8..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelTimelineAggregator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage for a given YARN application.
- *
- * App-related lifecycle management is handled by this service.
- */
-@Private
-@Unstable
-public class AppLevelTimelineAggregator extends TimelineAggregator {
-  private final String applicationId;
-  // TODO define key metadata such as flow metadata, user, and queue
-
-  public AppLevelTimelineAggregator(String applicationId) {
-    super(AppLevelTimelineAggregator.class.getName() + " - " + applicationId);
-    this.applicationId = applicationId;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
deleted file mode 100644
index 19920fd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.nio.ByteBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.hadoop.yarn.server.api.ContainerContext;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * The top-level server for the per-node timeline aggregator collection. Currently
- * it is defined as an auxiliary service to accommodate running within another
- * daemon (e.g. node manager).
- */
-@Private
-@Unstable
-public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
-  private static final Log LOG =
-      LogFactory.getLog(PerNodeTimelineAggregatorsAuxService.class);
-  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-  private final TimelineAggregatorsCollection aggregatorCollection;
-
-  public PerNodeTimelineAggregatorsAuxService() {
-    // use the same singleton
-    this(TimelineAggregatorsCollection.getInstance());
-  }
-
-  @VisibleForTesting PerNodeTimelineAggregatorsAuxService(
-      TimelineAggregatorsCollection aggregatorCollection) {
-    super("timeline_aggregator");
-    this.aggregatorCollection = aggregatorCollection;
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    aggregatorCollection.init(conf);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    aggregatorCollection.start();
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    aggregatorCollection.stop();
-    super.serviceStop();
-  }
-
-  // these methods can be used as the basis for future service methods if the
-  // per-node aggregator runs separate from the node manager
-  /**
-   * Creates and adds an app level aggregator for the specified application id.
-   * The aggregator is also initialized and started. If the service already
-   * exists, no new service is created.
-   *
-   * @return whether it was added successfully
-   */
-  public boolean addApplication(ApplicationId appId) {
-    AppLevelTimelineAggregator aggregator =
-        new AppLevelTimelineAggregator(appId.toString());
-    return (aggregatorCollection.putIfAbsent(appId, aggregator)
-        == aggregator);
-  }
-
-  /**
-   * Removes the app level aggregator for the specified application id. The
-   * aggregator is also stopped as a result. If the aggregator does not exist, no
-   * change is made.
-   *
-   * @return whether it was removed successfully
-   */
-  public boolean removeApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
-    return aggregatorCollection.remove(appIdString);
-  }
-
-  /**
-   * Creates and adds an app level aggregator for the specified application id.
-   * The aggregator is also initialized and started. If the aggregator already
-   * exists, no new aggregator is created.
-   */
-  @Override
-  public void initializeContainer(ContainerInitializationContext context) {
-    // intercept the event of the AM container being created and initialize the
-    // app level aggregator service
-    if (isApplicationMaster(context)) {
-      ApplicationId appId = context.getContainerId().
-          getApplicationAttemptId().getApplicationId();
-      addApplication(appId);
-    }
-  }
-
-  /**
-   * Removes the app level aggregator for the specified application id. The
-   * aggregator is also stopped as a result. If the aggregator does not exist, no
-   * change is made.
-   */
-  @Override
-  public void stopContainer(ContainerTerminationContext context) {
-    // intercept the event of the AM container being stopped and remove the app
-    // level aggregator service
-    if (isApplicationMaster(context)) {
-      ApplicationId appId = context.getContainerId().
-          getApplicationAttemptId().getApplicationId();
-      removeApplication(appId);
-    }
-  }
-
-  private boolean isApplicationMaster(ContainerContext context) {
-    // TODO this is based on a (shaky) assumption that the container id (the
-    // last field of the full container id) for an AM is always 1
-    // we want to make this much more reliable
-    ContainerId containerId = context.getContainerId();
-    return containerId.getContainerId() == 1L;
-  }
-
-  @VisibleForTesting
-  boolean hasApplication(String appId) {
-    return aggregatorCollection.containsKey(appId);
-  }
-
-  @Override
-  public void initializeApplication(ApplicationInitializationContext context) {
-  }
-
-  @Override
-  public void stopApplication(ApplicationTerminationContext context) {
-  }
-
-  @Override
-  public ByteBuffer getMetaData() {
-    // TODO currently it is not used; we can return a more meaningful data when
-    // we connect it with an AM
-    return ByteBuffer.allocate(0);
-  }
-
-  @VisibleForTesting
-  public static PerNodeTimelineAggregatorsAuxService launchServer(String[] args) {
-    Thread
-      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    StringUtils.startupShutdownMessage(PerNodeTimelineAggregatorsAuxService.class, args,
-        LOG);
-    PerNodeTimelineAggregatorsAuxService auxService = null;
-    try {
-      auxService = new PerNodeTimelineAggregatorsAuxService();
-      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
-          SHUTDOWN_HOOK_PRIORITY);
-      YarnConfiguration conf = new YarnConfiguration();
-      auxService.init(conf);
-      auxService.start();
-    } catch (Throwable t) {
-      LOG.fatal("Error starting PerNodeAggregatorServer", t);
-      ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
-    }
-    return auxService;
-  }
-
-  private static class ShutdownHook implements Runnable {
-    private final PerNodeTimelineAggregatorsAuxService auxService;
-
-    public ShutdownHook(PerNodeTimelineAggregatorsAuxService auxService) {
-      this.auxService = auxService;
-    }
-
-    public void run() {
-      auxService.stop();
-    }
-  }
-
-  public static void main(String[] args) {
-    launchServer(args);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
deleted file mode 100644
index dbd0895..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
-import org.apache.hadoop.util.ReflectionUtils;
-/**
- * Service that handles writes to the timeline service and writes them to the
- * backing storage.
- *
- * Classes that extend this can add their own lifecycle management or
- * customization of request handling.
- */
-@Private
-@Unstable
-public abstract class TimelineAggregator extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
-
-  private TimelineWriter writer;
-
-  public TimelineAggregator(String name) {
-    super(name);
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-    writer = ReflectionUtils.newInstance(conf.getClass(
-        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
-        FileSystemTimelineWriterImpl.class,
-        TimelineWriter.class), conf);
-    writer.init(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-    writer.stop();
-  }
-
-  public TimelineWriter getWriter() {
-    return writer;
-  }
-
-  /**
-   * Handles entity writes. These writes are synchronous and are written to the
-   * backing storage without buffering/batching. If any entity already exists,
-   * it results in an update of the entity.
-   *
-   * This method should be reserved for selected critical entities and events.
-   * For normal voluminous writes one should use the async method
-   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
-   *
-   * @param entities entities to post
-   * @param callerUgi the caller UGI
-   * @return the response that contains the result of the post.
-   */
-  public TimelineWriteResponse postEntities(TimelineEntities entities,
-      UserGroupInformation callerUgi) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
-      LOG.debug("postEntities(entities=" + entities + ", callerUgi="
-          + callerUgi + ")");
-    }
-
-    return writer.write(entities);
-  }
-
-  /**
-   * Handles entity writes in an asynchronous manner. The method returns as soon
-   * as validation is done. No promises are made on how quickly it will be
-   * written to the backing storage or if it will always be written to the
-   * backing storage. Multiple writes to the same entities may be batched and
-   * appropriate values updated and result in fewer writes to the backing
-   * storage.
-   *
-   * @param entities entities to post
-   * @param callerUgi the caller UGI
-   */
-  public void postEntitiesAsync(TimelineEntities entities,
-      UserGroupInformation callerUgi) {
-    // TODO implement
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
-          callerUgi + ")");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
deleted file mode 100644
index 7d42f94..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorWebService.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.webapp.ForbiddenException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-
-import com.google.inject.Singleton;
-
-/**
- * The main per-node REST end point for timeline service writes. It is
- * essentially a container service that routes requests to the appropriate
- * per-app services.
- */
-@Private
-@Unstable
-@Singleton
-@Path("/ws/v2/timeline")
-public class TimelineAggregatorWebService {
-  private static final Log LOG =
-      LogFactory.getLog(TimelineAggregatorWebService.class);
-
-  private @Context ServletContext context;
-
-  @XmlRootElement(name = "about")
-  @XmlAccessorType(XmlAccessType.NONE)
-  @Public
-  @Unstable
-  public static class AboutInfo {
-
-    private String about;
-
-    public AboutInfo() {
-
-    }
-
-    public AboutInfo(String about) {
-      this.about = about;
-    }
-
-    @XmlElement(name = "About")
-    public String getAbout() {
-      return about;
-    }
-
-    public void setAbout(String about) {
-      this.about = about;
-    }
-
-  }
-
-  /**
-   * Return the description of the timeline web services.
-   */
-  @GET
-  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public AboutInfo about(
-      @Context HttpServletRequest req,
-      @Context HttpServletResponse res) {
-    init(res);
-    return new AboutInfo("Timeline API");
-  }
-
-  /**
-   * Accepts writes to the aggregator, and returns a response. It simply routes
-   * the request to the app level aggregator. It expects an application as a
-   * context.
-   */
-  @PUT
-  @Path("/entities")
-  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public Response putEntities(
-      @Context HttpServletRequest req,
-      @Context HttpServletResponse res,
-      @QueryParam("async") String async,
-      @QueryParam("appid") String appId,
-      TimelineEntities entities) {
-    init(res);
-    UserGroupInformation callerUgi = getUser(req);
-    if (callerUgi == null) {
-      String msg = "The owner of the posted timeline entities is not set";
-      LOG.error(msg);
-      throw new ForbiddenException(msg);
-    }
-
-    // TODO how to express async posts and handle them
-    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
-
-    try {
-      appId = parseApplicationId(appId);
-      if (appId == null) {
-        return Response.status(Response.Status.BAD_REQUEST).build();
-      }
-      TimelineAggregator service = getAggregatorService(req, appId);
-      if (service == null) {
-        LOG.error("Application not found");
-        throw new NotFoundException(); // different exception?
-      }
-      service.postEntities(entities, callerUgi);
-      return Response.ok().build();
-    } catch (Exception e) {
-      LOG.error("Error putting entities", e);
-      throw new WebApplicationException(e,
-          Response.Status.INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  private String parseApplicationId(String appId) {
-    // Make sure the appId is not null and is valid
-    ApplicationId appID;
-    try {
-      if (appId != null) {
-        return ConverterUtils.toApplicationId(appId.trim()).toString();
-      } else {
-        return null;
-      }
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  private TimelineAggregator
-      getAggregatorService(HttpServletRequest req, String appIdToParse) {
-    String appIdString = parseApplicationId(appIdToParse);
-    final TimelineAggregatorsCollection aggregatorCollection =
-        (TimelineAggregatorsCollection) context.getAttribute(
-            TimelineAggregatorsCollection.AGGREGATOR_COLLECTION_ATTR_KEY);
-    return aggregatorCollection.get(appIdString);
-  }
-
-  private void init(HttpServletResponse response) {
-    response.setContentType(null);
-  }
-
-  private UserGroupInformation getUser(HttpServletRequest req) {
-    String remoteUser = req.getRemoteUser();
-    UserGroupInformation callerUgi = null;
-    if (remoteUser != null) {
-      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
-    }
-    return callerUgi;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
deleted file mode 100644
index d6e2a18..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-/**
- * Class that manages adding and removing aggregators and their lifecycle. It
- * provides thread safety access to the aggregators inside.
- *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
- */
-@Private
-@Unstable
-public class TimelineAggregatorsCollection extends CompositeService {
-  private static final Log LOG =
-      LogFactory.getLog(TimelineAggregatorsCollection.class);
-  private static final TimelineAggregatorsCollection INSTANCE =
-      new TimelineAggregatorsCollection();
-
-  // access to this map is synchronized with the map itself
-  private final Map<String, TimelineAggregator> aggregators =
-      Collections.synchronizedMap(
-          new HashMap<String, TimelineAggregator>());
-
-  // REST server for this aggregator collection
-  private HttpServer2 timelineRestServer;
-  
-  private String timelineRestServerBindAddress;
-  
-  private AggregatorNodemanagerProtocol nmAggregatorService;
-  
-  private InetSocketAddress nmAggregatorServiceAddress;
-
-  static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
-
-  static TimelineAggregatorsCollection getInstance() {
-    return INSTANCE;
-  }
-
-  TimelineAggregatorsCollection() {
-    super(TimelineAggregatorsCollection.class.getName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    this.nmAggregatorServiceAddress = conf.getSocketAddr(
-        YarnConfiguration.NM_BIND_HOST,
-        YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
-    
-  }
-  
-  @Override
-  protected void serviceStart() throws Exception {
-    startWebApp();
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (timelineRestServer != null) {
-      timelineRestServer.stop();
-    }
-    super.serviceStop();
-  }
-
-  /**
-   * Put the aggregator into the collection if an aggregator mapped by id does
-   * not exist.
-   *
-   * @throws YarnRuntimeException if there was any exception in initializing and
-   * starting the app level service
-   * @return the aggregator associated with id after the potential put.
-   */
-  public TimelineAggregator putIfAbsent(ApplicationId appId, 
-      TimelineAggregator aggregator) {
-    String id = appId.toString();
-    TimelineAggregator aggregatorInTable;
-    boolean aggregatorIsNew = false;
-    synchronized (aggregators) {
-      aggregatorInTable = aggregators.get(id);
-      if (aggregatorInTable == null) {
-        try {
-          // initialize, start, and add it to the collection so it can be
-          // cleaned up when the parent shuts down
-          aggregator.init(getConfig());
-          aggregator.start();
-          aggregators.put(id, aggregator);
-          LOG.info("the aggregator for " + id + " was added");
-          aggregatorInTable = aggregator;
-          aggregatorIsNew = true;
-        } catch (Exception e) {
-          throw new YarnRuntimeException(e);
-        }
-      } else {
-        String msg = "the aggregator for " + id + " already exists!";
-        LOG.error(msg);
-      }
-      
-    }
-    // Report to NM if a new aggregator is added.
-    if (aggregatorIsNew) {
-      try {
-        reportNewAggregatorToNM(appId);
-      } catch (Exception e) {
-        // throw exception here as it cannot be used if failed report to NM
-        LOG.error("Failed to report a new aggregator for application: " + appId + 
-            " to NM Aggregator Services.");
-        throw new YarnRuntimeException(e);
-      }
-    }
-    
-    return aggregatorInTable;
-  }
-
-  /**
-   * Removes the aggregator for the specified id. The aggregator is also stopped
-   * as a result. If the aggregator does not exist, no change is made.
-   *
-   * @return whether it was removed successfully
-   */
-  public boolean remove(String id) {
-    synchronized (aggregators) {
-      TimelineAggregator aggregator = aggregators.remove(id);
-      if (aggregator == null) {
-        String msg = "the aggregator for " + id + " does not exist!";
-        LOG.error(msg);
-        return false;
-      } else {
-        // stop the service to do clean up
-        aggregator.stop();
-        LOG.info("the aggregator service for " + id + " was removed");
-        return true;
-      }
-    }
-  }
-
-  /**
-   * Returns the aggregator for the specified id.
-   *
-   * @return the aggregator or null if it does not exist
-   */
-  public TimelineAggregator get(String id) {
-    return aggregators.get(id);
-  }
-
-  /**
-   * Returns whether the aggregator for the specified id exists in this
-   * collection.
-   */
-  public boolean containsKey(String id) {
-    return aggregators.containsKey(id);
-  }
-
-  /**
-   * Launch the REST web server for this aggregator collection
-   */
-  private void startWebApp() {
-    Configuration conf = getConfig();
-    // use the same ports as the old ATS for now; we could create new properties
-    // for the new timeline service if needed
-    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
-        YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
-        WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
-    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
-        NetUtils.createSocketAddr(bindAddress));
-    LOG.info("Instantiating the per-node aggregator webapp at " + 
-        timelineRestServerBindAddress);
-    try {
-      Configuration confForInfoServer = new Configuration(conf);
-      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
-      HttpServer2.Builder builder = new HttpServer2.Builder()
-          .setName("timeline")
-          .setConf(conf)
-          .addEndpoint(URI.create("http://" + bindAddress));
-      timelineRestServer = builder.build();
-      // TODO: replace this by an authentication filter in future.
-      HashMap<String, String> options = new HashMap<>();
-      String username = conf.get(HADOOP_HTTP_STATIC_USER,
-          DEFAULT_HADOOP_HTTP_STATIC_USER);
-      options.put(HADOOP_HTTP_STATIC_USER, username);
-      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
-          "static_user_filter_timeline",
-          StaticUserWebFilter.StaticUserFilter.class.getName(),
-          options, new String[] {"/*"});
-
-      timelineRestServer.addJerseyResourcePackage(
-          TimelineAggregatorWebService.class.getPackage().getName() + ";"
-              + GenericExceptionHandler.class.getPackage().getName() + ";"
-              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
-          "/*");
-      timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
-          TimelineAggregatorsCollection.getInstance());
-      timelineRestServer.start();
-    } catch (Exception e) {
-      String msg = "The per-node aggregator webapp failed to start.";
-      LOG.error(msg, e);
-      throw new YarnRuntimeException(msg, e);
-    }
-  }
-  
-  private void reportNewAggregatorToNM(ApplicationId appId) 
-      throws YarnException, IOException {
-    this.nmAggregatorService = getNMAggregatorService();
-    ReportNewAggregatorsInfoRequest request = 
-        ReportNewAggregatorsInfoRequest.newInstance(appId,
-            this.timelineRestServerBindAddress);
-    LOG.info("Report a new aggregator for application: " + appId + 
-        " to NM Aggregator Services.");
-    nmAggregatorService.reportNewAggregatorInfo(request);
-  }
-  
-  // protected for test
-  protected AggregatorNodemanagerProtocol getNMAggregatorService(){
-    Configuration conf = getConfig();
-    final YarnRPC rpc = YarnRPC.create(conf);
-    
-    // TODO Security settings.
-    return (AggregatorNodemanagerProtocol) rpc.getProxy(
-        AggregatorNodemanagerProtocol.class,
-        nmAggregatorServiceAddress, conf);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dda84085/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
new file mode 100644
index 0000000..7d59876
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelTimelineCollector extends TimelineCollector {
+  private final String applicationId;
+  // TODO define key metadata such as flow metadata, user, and queue
+
+  public AppLevelTimelineCollector(String applicationId) {
+    super(AppLevelTimelineCollector.class.getName() + " - " + applicationId);
+    this.applicationId = applicationId;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+}