You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/18 04:34:41 UTC

[49/50] hadoop git commit: YARN-3039. Implemented the app-level timeline aggregator discovery service. Contributed by Junping Du.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
new file mode 100644
index 0000000..eb7beef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl;
+
+public class ReportNewAggregatorsInfoRequestPBImpl extends
+    ReportNewAggregatorsInfoRequest {
+
+  ReportNewAggregatorsInfoRequestProto proto = 
+      ReportNewAggregatorsInfoRequestProto.getDefaultInstance();
+  
+  ReportNewAggregatorsInfoRequestProto.Builder builder = null;
+  boolean viaProto = false;
+
+  private List<AppAggregatorsMap> aggregatorsList = null;
+
+  public ReportNewAggregatorsInfoRequestPBImpl() {
+    builder = ReportNewAggregatorsInfoRequestProto.newBuilder();
+  }
+
+  public ReportNewAggregatorsInfoRequestPBImpl(
+      ReportNewAggregatorsInfoRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public ReportNewAggregatorsInfoRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+  
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto)
+      maybeInitBuilder();
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (aggregatorsList != null) {
+      addLocalAggregatorsToProto();
+    }
+  }
+  
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void addLocalAggregatorsToProto() {
+    maybeInitBuilder();
+    builder.clearAppAggregators();
+    List<AppAggregatorsMapProto> protoList =
+        new ArrayList<AppAggregatorsMapProto>();
+    for (AppAggregatorsMap m : this.aggregatorsList) {
+      protoList.add(convertToProtoFormat(m));
+    }
+    builder.addAllAppAggregators(protoList);
+  }
+
+  private void initLocalAggregatorsList() {
+    ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppAggregatorsMapProto> aggregatorsList =
+        p.getAppAggregatorsList();
+    this.aggregatorsList = new ArrayList<AppAggregatorsMap>();
+    for (AppAggregatorsMapProto m : aggregatorsList) {
+      this.aggregatorsList.add(convertFromProtoFormat(m));
+    }
+  }
+
+  @Override
+  public List<AppAggregatorsMap> getAppAggregatorsList() {  
+    if (this.aggregatorsList == null) {
+      initLocalAggregatorsList();
+    }
+    return this.aggregatorsList;
+  }
+
+  @Override
+  public void setAppAggregatorsList(List<AppAggregatorsMap> appAggregatorsList) {
+    maybeInitBuilder();
+    if (appAggregatorsList == null) {
+      builder.clearAppAggregators();
+    }
+    this.aggregatorsList = appAggregatorsList;
+  }
+  
+  private AppAggregatorsMapPBImpl convertFromProtoFormat(
+      AppAggregatorsMapProto p) {
+    return new AppAggregatorsMapPBImpl(p);
+  }
+
+  private AppAggregatorsMapProto convertToProtoFormat(
+      AppAggregatorsMap m) {
+    return ((AppAggregatorsMapPBImpl) m).getProto();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
new file mode 100644
index 0000000..0f0925a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ReportNewAggregatorsInfoResponsePBImpl extends
+    ReportNewAggregatorsInfoResponse {
+
+  ReportNewAggregatorsInfoResponseProto proto = 
+      ReportNewAggregatorsInfoResponseProto.getDefaultInstance();
+  
+  ReportNewAggregatorsInfoResponseProto.Builder builder = null;
+  
+  boolean viaProto = false;
+  
+  public ReportNewAggregatorsInfoResponsePBImpl() {
+    builder = ReportNewAggregatorsInfoResponseProto.newBuilder();
+  }
+
+  public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public ReportNewAggregatorsInfoResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
new file mode 100644
index 0000000..67c377d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
@@ -0,0 +1,33 @@
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+public abstract class AppAggregatorsMap {
+
+  public static AppAggregatorsMap newInstance(
+      ApplicationId id, String aggregatorAddr) {
+    AppAggregatorsMap appAggregatorMap =
+        Records.newRecord(AppAggregatorsMap.class);
+    appAggregatorMap.setApplicationId(id);
+    appAggregatorMap.setAggregatorAddr(aggregatorAddr);
+    return appAggregatorMap;
+  }
+  
+  public abstract ApplicationId getApplicationId();
+  
+  public abstract void setApplicationId(
+      ApplicationId id);
+  
+  public abstract String getAggregatorAddr();
+  
+  public abstract void setAggregatorAddr(
+      String addr);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
new file mode 100644
index 0000000..32903e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AppAggregatorsMapPBImpl extends AppAggregatorsMap {
+
+  AppAggregatorsMapProto proto = 
+      AppAggregatorsMapProto.getDefaultInstance();
+  
+  AppAggregatorsMapProto.Builder builder = null;
+  boolean viaProto = false;
+  
+  private ApplicationId appId = null;
+  private String aggregatorAddr = null;
+  
+  public AppAggregatorsMapPBImpl() {
+    builder = AppAggregatorsMapProto.newBuilder();
+  }
+
+  public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+  
+  public AppAggregatorsMapProto getProto() {
+      mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null)
+      return false;
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+  
+  @Override
+  public ApplicationId getApplicationId() {
+    AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.appId == null && p.hasAppId()) {
+      this.appId = convertFromProtoFormat(p.getAppId());
+    }
+    return this.appId;
+  }
+  
+  @Override
+  public String getAggregatorAddr() {
+    AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.aggregatorAddr == null 
+        && p.hasAppAggregatorAddr()) {
+      this.aggregatorAddr = p.getAppAggregatorAddr();
+    }
+    return this.aggregatorAddr;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId appId) {
+    maybeInitBuilder();
+    if (appId == null) {
+      builder.clearAppId();
+    }
+    this.appId = appId;
+  }
+  
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    maybeInitBuilder();
+    if (aggregatorAddr == null) {
+      builder.clearAppAggregatorAddr();
+    }
+    this.aggregatorAddr = aggregatorAddr;
+  }
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+  
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+  
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = AppAggregatorsMapProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+  
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+  
+  private void mergeLocalToBuilder() {
+    if (this.appId != null) {
+      builder.setAppId(convertToProtoFormat(this.appId));
+    }
+    if (this.aggregatorAddr != null) {
+      builder.setAppAggregatorAddr(this.aggregatorAddr);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
new file mode 100644
index 0000000..d7b05c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "AggregatorNodemanagerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service AggregatorNodemanagerProtocolService {
+  rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 91473c5..3b03f58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -47,6 +47,7 @@ message NodeHeartbeatRequestProto {
   optional NodeStatusProto node_status = 1;
   optional MasterKeyProto last_known_container_token_master_key = 2;
   optional MasterKeyProto last_known_nm_token_master_key = 3;
+  repeated AppAggregatorsMapProto registered_aggregators = 4;
 }
 
 message NodeHeartbeatResponseProto {
@@ -60,6 +61,7 @@ message NodeHeartbeatResponseProto {
   optional string diagnostics_message = 8;
   repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
   repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+  repeated AppAggregatorsMapProto app_aggregators_map = 11;
 }
 
 message SystemCredentialsForAppsProto {
@@ -67,6 +69,25 @@ message SystemCredentialsForAppsProto {
   optional bytes credentialsForApp = 2;
 }
 
+////////////////////////////////////////////////////////////////////////
+////// From aggregator_nodemanager_protocol ////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message AppAggregatorsMapProto {
+  optional ApplicationIdProto appId = 1;
+  optional string appAggregatorAddr = 2;
+}
+
+//////////////////////////////////////////////////////
+/////// aggregator_nodemanager_protocol //////////////
+//////////////////////////////////////////////////////
+message ReportNewAggregatorsInfoRequestProto {
+  repeated AppAggregatorsMapProto app_aggregators = 1;
+}
+
+message ReportNewAggregatorsInfoResponseProto {
+}
+
+
 message NMContainerStatusProto {
   optional ContainerIdProto container_id = 1;
   optional ContainerStateProto container_state = 2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
new file mode 100644
index 0000000..af9d60f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -0,0 +1,345 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRPC {
+
+  private static final String EXCEPTION_MSG = "test error";
+  private static final String EXCEPTION_CAUSE = "exception cause";
+  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  
+  public static final String ILLEGAL_NUMBER_MESSAGE = 
+      "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE.";
+  
+  public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0";
+  
+  public static final ApplicationId DEFAULT_APP_ID = 
+      ApplicationId.newInstance(0, 0);
+  
+  @Test
+  public void testUnknownCall() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(ContainerManagementProtocol.class,
+        new DummyContainerManager(), addr, conf, null, 1);
+    server.start();
+
+    // Any unrelated protocol would do
+    ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
+
+    try {
+      proxy.getNewApplication(Records
+          .newRecord(GetNewApplicationRequest.class));
+      Assert.fail("Excepted RPC call to fail with unknown method.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().matches(
+          "Unknown method getNewApplication called on.*"
+              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+              + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      server.stop();
+    }
+  }
+  
+  @Test
+  public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(AggregatorNodemanagerProtocol.class,
+        new DummyNMAggregatorService(), addr, conf, null, 1);
+    server.start();
+
+    // Test unrelated protocol wouldn't get response
+    ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
+
+    try {
+      unknownProxy.getNewApplication(Records
+          .newRecord(GetNewApplicationRequest.class));
+      Assert.fail("Excepted RPC call to fail with unknown method.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().matches(
+          "Unknown method getNewApplication called on.*"
+              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+              + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    
+    // Test AggregatorNodemanagerProtocol get proper response
+    AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy(
+        AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
+    // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get 
+    // normally response.
+    try {
+      ReportNewAggregatorsInfoRequest request = 
+          ReportNewAggregatorsInfoRequest.newInstance(
+              DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR);
+      proxy.reportNewAggregatorInfo(request);
+    } catch (YarnException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+    
+    // Verify empty request get YarnException back (by design in 
+    // DummyNMAggregatorService)
+    try {
+      proxy.reportNewAggregatorInfo(Records
+          .newRecord(ReportNewAggregatorsInfoRequest.class));
+      Assert.fail("Excepted RPC call to fail with YarnException.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
+    }
+    
+    server.stop();
+  }
+
+  @Test
+  public void testHadoopProtoRPC() throws Exception {
+    test(HadoopYarnProtoRPC.class.getName());
+  }
+  
+  private void test(String rpcClass) throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(ContainerManagementProtocol.class, 
+            new DummyContainerManager(), addr, conf, null, 1);
+    server.start();
+    RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
+    ContainerManagementProtocol proxy = (ContainerManagementProtocol) 
+        rpc.getProxy(ContainerManagementProtocol.class, 
+            NetUtils.getConnectAddress(server), conf);
+    ContainerLaunchContext containerLaunchContext = 
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    ApplicationId applicationId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 0);
+    ContainerId containerId =
+        ContainerId.newContainerId(applicationAttemptId, 100);
+    NodeId nodeId = NodeId.newInstance("localhost", 1234);
+    Resource resource = Resource.newInstance(1234, 2);
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(containerId, "localhost", "user",
+          resource, System.currentTimeMillis() + 10000, 42, 42,
+          Priority.newInstance(0), 0);
+    Token containerToken = newContainerToken(nodeId, "password".getBytes(),
+          containerTokenIdentifier);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          containerToken);
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    proxy.startContainers(allRequests);
+
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(containerId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
+    GetContainerStatusesResponse response =
+        proxy.getContainerStatuses(gcsRequest);
+    List<ContainerStatus> statuses = response.getContainerStatuses();
+
+    //test remote exception
+    boolean exception = false;
+    try {
+      StopContainersRequest stopRequest =
+          recordFactory.newRecordInstance(StopContainersRequest.class);
+      stopRequest.setContainerIds(containerIds);
+      proxy.stopContainers(stopRequest);
+      } catch (YarnException e) {
+      exception = true;
+      Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
+      Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
+      System.out.println("Test Exception is " + e.getMessage());
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    } finally {
+      server.stop();
+    }
+    Assert.assertTrue(exception);
+    Assert.assertNotNull(statuses.get(0));
+    Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
+  }
+
+  public class DummyContainerManager implements ContainerManagementProtocol {
+
+    private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+
+    @Override
+    public GetContainerStatusesResponse getContainerStatuses(
+        GetContainerStatusesRequest request)
+    throws YarnException {
+      GetContainerStatusesResponse response = 
+          recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
+      response.setContainerStatuses(statuses);
+      return response;
+    }
+
+    @Override
+    public StartContainersResponse startContainers(
+        StartContainersRequest requests) throws YarnException {
+      StartContainersResponse response =
+          recordFactory.newRecordInstance(StartContainersResponse.class);
+      for (StartContainerRequest request : requests.getStartContainerRequests()) {
+        Token containerToken = request.getContainerToken();
+        ContainerTokenIdentifier tokenId = null;
+
+        try {
+          tokenId = newContainerTokenIdentifier(containerToken);
+        } catch (IOException e) {
+          throw RPCUtil.getRemoteException(e);
+        }
+        ContainerStatus status =
+            recordFactory.newRecordInstance(ContainerStatus.class);
+        status.setState(ContainerState.RUNNING);
+        status.setContainerId(tokenId.getContainerID());
+        status.setExitStatus(0);
+        statuses.add(status);
+
+      }
+      return response;
+    }
+
+    @Override
+    public StopContainersResponse stopContainers(StopContainersRequest request) 
+    throws YarnException {
+      Exception e = new Exception(EXCEPTION_MSG, 
+          new Exception(EXCEPTION_CAUSE));
+      throw new YarnException(e);
+    }
+  }
+
+  public static ContainerTokenIdentifier newContainerTokenIdentifier(
+      Token containerToken) throws IOException {
+    org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+        new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>(
+            containerToken.getIdentifier()
+                .array(), containerToken.getPassword().array(), new Text(
+                containerToken.getKind()),
+            new Text(containerToken.getService()));
+    return token.decodeIdentifier();
+  }
+
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+  
+  // A dummy implementation for AggregatorNodemanagerProtocol for test purpose, 
+  // it only can accept one appID, aggregatorAddr pair or throw exceptions
+  public class DummyNMAggregatorService 
+      implements AggregatorNodemanagerProtocol {
+    
+    @Override
+    public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+        ReportNewAggregatorsInfoRequest request)
+        throws YarnException, IOException {
+      List<AppAggregatorsMap> appAggregators = request.getAppAggregatorsList();
+      if (appAggregators.size() == 1) {
+        // check default appID and aggregatorAddr
+        AppAggregatorsMap appAggregator = appAggregators.get(0);
+        Assert.assertEquals(appAggregator.getApplicationId(), 
+            DEFAULT_APP_ID);
+        Assert.assertEquals(appAggregator.getAggregatorAddr(), 
+            DEFAULT_AGGREGATOR_ADDR);
+      } else {
+        throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
+      }
+      
+      ReportNewAggregatorsInfoResponse response =
+          recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class);
+      return response;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 20983b6..47cf8ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -89,11 +91,14 @@ public class TestYarnServerApiClasses {
     original.setLastKnownContainerTokenMasterKey(getMasterKey());
     original.setLastKnownNMTokenMasterKey(getMasterKey());
     original.setNodeStatus(getNodeStatus());
+    Map<ApplicationId, String> aggregators = getAggregators();
+    original.setRegisteredAggregators(aggregators);
     NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
         original.getProto());
     assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
     assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
+    assertEquals(aggregators, copy.getRegisteredAggregators());
   }
 
   /**
@@ -110,6 +115,8 @@ public class TestYarnServerApiClasses {
     original.setNextHeartBeatInterval(1000);
     original.setNodeAction(NodeAction.NORMAL);
     original.setResponseId(100);
+    Map<ApplicationId, String> aggregators = getAggregators();
+    original.setAppAggregatorsMap(aggregators);
 
     NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
         original.getProto());
@@ -119,6 +126,7 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+    assertEquals(aggregators, copy.getAppAggregatorsMap());
   }
 
   /**
@@ -208,6 +216,15 @@ public class TestYarnServerApiClasses {
 
   }
 
+  private Map<ApplicationId, String> getAggregators() {
+    ApplicationId appID = ApplicationId.newInstance(1L, 1);
+    String aggregatorAddr = "localhost:0";
+    Map<ApplicationId, String> aggregatorMap = 
+        new HashMap<ApplicationId, String>();
+    aggregatorMap.put(appID, aggregatorAddr);
+    return aggregatorMap;
+  }
+  
   private ContainerStatus getContainerStatus(int applicationId,
       int containerID, int appAttemptId) {
     ContainerStatus status = recordFactory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 6e7e2ec..85f3f0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -57,6 +57,19 @@ public interface Context {
   ConcurrentMap<ApplicationId, Application> getApplications();
 
   Map<ApplicationId, Credentials> getSystemCredentialsForApps();
+  
+  /**
+   * Get the registered aggregators that located on this NM. 
+   * @return registered
+   */
+  Map<ApplicationId, String> getRegisteredAggregators();
+  
+  /**
+   * Return the known aggregators which get from RM for all active applications
+   * running on this NM.
+   * @return known aggregators.
+   */
+  Map<ApplicationId, String> getKnownAggregators();
 
   ConcurrentMap<ContainerId, Container> getContainers();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a4be120..10143db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -84,8 +85,9 @@ public class NodeManager extends CompositeService
   private Context context;
   private AsyncDispatcher dispatcher;
   private ContainerManagerImpl containerManager;
+  private NMAggregatorService nmAggregatorService;
   private NodeStatusUpdater nodeStatusUpdater;
-  private static CompositeServiceShutdownHook nodeManagerShutdownHook; 
+  private static CompositeServiceShutdownHook nodeManagerShutdownHook;
   private NMStateStoreService nmStore = null;
   
   private AtomicBoolean isStopping = new AtomicBoolean(false);
@@ -112,6 +114,10 @@ public class NodeManager extends CompositeService
     return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
       metrics, aclsManager, dirsHandler);
   }
+  
+  protected NMAggregatorService createNMAggregatorService(Context context) {
+    return new NMAggregatorService(context);
+  }
 
   protected WebServer createWebServer(Context nmContext,
       ResourceView resourceView, ApplicationACLsManager aclsManager,
@@ -268,6 +274,9 @@ public class NodeManager extends CompositeService
     addService(dispatcher);
     
     DefaultMetricsSystem.initialize("NodeManager");
+    
+    this.nmAggregatorService = createNMAggregatorService(context);
+    addService(nmAggregatorService);
 
     // StatusUpdater should be added last so that it get started last 
     // so that we make sure everything is up before registering with RM. 
@@ -345,6 +354,12 @@ public class NodeManager extends CompositeService
 
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
+    
+    protected Map<ApplicationId, String> registeredAggregators =
+        new ConcurrentHashMap<ApplicationId, String>();
+    
+    protected Map<ApplicationId, String> knownAggregators =
+        new ConcurrentHashMap<ApplicationId, String>();
 
     private final NMContainerTokenSecretManager containerTokenSecretManager;
     private final NMTokenSecretManagerInNM nmTokenSecretManager;
@@ -460,6 +475,30 @@ public class NodeManager extends CompositeService
         Map<ApplicationId, Credentials> systemCredentials) {
       this.systemCredentials = systemCredentials;
     }
+    
+    @Override
+    public Map<ApplicationId, String> getRegisteredAggregators() {
+      return this.registeredAggregators;
+    }
+
+    public void addRegisteredAggregators(
+        Map<ApplicationId, String> newRegisteredAggregators) {
+      this.registeredAggregators.putAll(newRegisteredAggregators);
+      // Update to knownAggregators as well so it can immediately be consumed by 
+      // this NM's TimelineClient.
+      this.knownAggregators.putAll(newRegisteredAggregators);
+    }
+    
+    @Override
+    public Map<ApplicationId, String> getKnownAggregators() {
+      return this.knownAggregators;
+    }
+
+    public void addKnownAggregators(
+        Map<ApplicationId, String> knownAggregators) {
+      this.knownAggregators.putAll(knownAggregators);
+    }
+    
   }
 
 
@@ -523,6 +562,11 @@ public class NodeManager extends CompositeService
   public Context getNMContext() {
     return this.context;
   }
+  
+  // For testing
+  NMAggregatorService getNMAggregatorService() {
+    return this.nmAggregatorService;
+  }
 
   public static void main(String[] args) throws IOException {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 6ddd7e4..c855833 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -592,7 +592,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   NodeStatusUpdaterImpl.this.context
                     .getContainerTokenSecretManager().getCurrentKey(),
                   NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
-                    .getCurrentKey());
+                    .getCurrentKey(),
+                  NodeStatusUpdaterImpl.this.context.getRegisteredAggregators());
             response = resourceTracker.nodeHeartbeat(request);
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -655,6 +656,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               ((NMContext) context)
                 .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
             }
+            
+            Map<ApplicationId, String> knownAggregators = response.getAppAggregatorsMap();
+            ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators);
+            
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
new file mode 100644
index 0000000..17150ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+
+public class NMAggregatorService extends CompositeService implements 
+    AggregatorNodemanagerProtocol {
+
+  private static final Log LOG = LogFactory.getLog(NMAggregatorService.class);
+
+  final Context context;
+  
+  private Server server;
+
+  public NMAggregatorService(Context context) {
+    
+    super(NMAggregatorService.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    
+    InetSocketAddress aggregatorServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+
+    // TODO Security settings.
+    YarnRPC rpc = YarnRPC.create(conf);
+
+    server =
+        rpc.getServer(AggregatorNodemanagerProtocol.class, this, 
+            aggregatorServerAddress, serverConf,
+            this.context.getNMTokenSecretManager(),
+            conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT, 
+                YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT));
+
+    server.start();
+    // start remaining services
+    super.serviceStart();
+    LOG.info("NMAggregatorService started at " + aggregatorServerAddress);
+  }
+
+  
+  @Override
+  public void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+    // TODO may cleanup app aggregators running on this NM in future.
+    super.serviceStop();
+  }
+
+  @Override
+  public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+      ReportNewAggregatorsInfoRequest request) throws IOException {
+    List<AppAggregatorsMap> newAggregatorsList = request.getAppAggregatorsList();
+    if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) {
+      Map<ApplicationId, String> newAggregatorsMap = 
+          new HashMap<ApplicationId, String>();
+      for (AppAggregatorsMap aggregator : newAggregatorsList) {
+        newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr());
+      }
+      ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap);
+    }
+    
+    return ReportNewAggregatorsInfoResponse.newInstance();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index a73b113..6bf3bbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -425,6 +425,10 @@ public class ApplicationImpl implements Application {
           new LogHandlerAppFinishedEvent(app.appId));
 
       app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+      // Remove aggregator info for finished apps.
+      // TODO check we remove related aggregators info in failure cases (YARN-3038)
+      app.context.getRegisteredAggregators().remove(app.getAppId());
+      app.context.getKnownAggregators().remove(app.getAppId());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 1c7f987..2eb1a7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -344,6 +344,8 @@ public class ApplicationMasterService extends AbstractService implements
 
     RMApp rmApp =
         rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+    // Remove aggregator address when app get finished.
+    rmApp.removeAggregatorAddr();
     // checking whether the app exits in RMStateStore at first not to throw
     // ApplicationDoesNotExistInCacheException before and after
     // RM work-preserving restart.
@@ -576,6 +578,10 @@ public class ApplicationMasterService extends AbstractService implements
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
 
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+      
+      // add aggregator address for this application
+      allocateResponse.setAggregatorAddr(
+          this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
 
       // add preemption to the allocateResponse message (if any)
       allocateResponse

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 0de556b..f163a28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -406,6 +410,11 @@ public class ResourceTrackerService extends AbstractService implements
           new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
       return resync;
     }
+    
+    // Check & update aggregators info from request.
+    // TODO make sure it won't have race condition issue for AM failed over case
+    // that the older registration could possible override the newer one.
+    updateAppAggregatorsMap(request);
 
     // Heartbeat response
     NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -421,15 +430,72 @@ public class ResourceTrackerService extends AbstractService implements
     if (!systemCredentials.isEmpty()) {
       nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
     }
+    
+    // Return aggregators' map that NM needs to know
+    // TODO we should optimize this to only include aggreator info that NM 
+    // doesn't know yet.
+    List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+    if (keepAliveApps != null) {
+      setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+    }
 
     // 4. Send status to RMNode, saving the latest response.
     this.rmContext.getDispatcher().getEventHandler().handle(
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
             remoteNodeStatus.getContainersStatuses(), 
-            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+            keepAliveApps, nodeHeartBeatResponse));
 
     return nodeHeartBeatResponse;
   }
+  
+  private void setAppAggregatorsMapToResponse(
+      List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
+    Map<ApplicationId, String> liveAppAggregatorsMap = new 
+        ConcurrentHashMap<ApplicationId, String>();
+    Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
+      for (ApplicationId appId : liveApps) {
+        String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
+        if (appAggregatorAddr != null) {
+          liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+        } else {
+          // Log a debug info if aggregator address is not found.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+          }
+        }
+      }
+    response.setAppAggregatorsMap(liveAppAggregatorsMap);
+  }
+  
+  private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
+    Map<ApplicationId, String> registeredAggregatorsMap = 
+        request.getRegisteredAggregators();
+    if (registeredAggregatorsMap != null 
+        && !registeredAggregatorsMap.isEmpty()) {
+      Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
+      for (Map.Entry<ApplicationId, String> entry: 
+          registeredAggregatorsMap.entrySet()) {
+        ApplicationId appId = entry.getKey();
+        String aggregatorAddr = entry.getValue();
+        if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+          RMApp rmApp = rmApps.get(appId);
+          if (rmApp == null) {
+            LOG.warn("Cannot update aggregator info because application ID: " + 
+                appId + " is not found in RMContext!");
+          } else {
+            String previousAggregatorAddr = rmApp.getAggregatorAddr();
+            if (previousAggregatorAddr == null || 
+                previousAggregatorAddr != aggregatorAddr) {
+              // sending aggregator update event.
+              RMAppAggregatorUpdateEvent event =
+                  new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+              rmContext.getDispatcher().getEventHandler().handle(event);
+            }
+          }
+        }
+      }
+    }
+  }
 
   private void populateKeys(NodeHeartbeatRequest request,
       NodeHeartbeatResponse nodeHeartBeatResponse) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fbcaab9..f81edb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -172,6 +172,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return the tracking url for the application master.
    */
   String getTrackingUrl();
+  
+  /**
+   * The aggregator address for the application.
+   * @return the address for the application's aggregator.
+   */
+  String getAggregatorAddr();
+  
+  /**
+   * Set aggregator address for the application
+   * @param aggregatorAddr the address of aggregator
+   */
+  void setAggregatorAddr(String aggregatorAddr);
+  
+  /**
+   * Remove aggregator address when application is finished or killed.
+   */
+  void removeAggregatorAddr();
 
   /**
    * The original tracking url for the application master.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
new file mode 100644
index 0000000..b43de44
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppAggregatorUpdateEvent extends RMAppEvent {
+
+  private final String appAggregatorAddr;
+  
+  public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
+    super(appId, RMAppEventType.AGGREGATOR_UPDATE);
+    this.appAggregatorAddr = appAggregatorAddr;
+  }
+  
+  public String getAppAggregatorAddr(){
+    return this.appAggregatorAddr;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 668c5e1..6e9460a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -30,6 +30,9 @@ public enum RMAppEventType {
 
   // Source: Scheduler
   APP_ACCEPTED,
+  
+  // TODO add source later
+  AGGREGATOR_UPDATE,
 
   // Source: RMAppAttempt
   ATTEMPT_REGISTERED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d1737a..6a076ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -134,6 +134,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private long startTime;
   private long finishTime = 0;
   private long storedFinishTime = 0;
+  private String aggregatorAddr;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -165,6 +166,8 @@ public class RMAppImpl implements RMApp, Recoverable {
      // Transitions from NEW state
     .addTransition(RMAppState.NEW, RMAppState.NEW,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW, RMAppState.NEW,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
         RMAppEventType.START, new RMAppNewlySavingTransition())
     .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -181,6 +184,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     // Transitions from NEW_SAVING state
     .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
         RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
     .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -199,6 +204,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
         RMAppEventType.APP_REJECTED,
         new FinalSavingTransition(
@@ -215,6 +222,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
         RMAppEventType.ATTEMPT_REGISTERED)
     .addTransition(RMAppState.ACCEPTED,
@@ -241,6 +250,8 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.MOVE, new RMAppMoveTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_UNREGISTERED,
         new FinalSavingTransition(
@@ -270,6 +281,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
         EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -281,6 +294,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
       EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -292,6 +307,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
+    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+        RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
     .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
         RMAppEventType.ATTEMPT_KILLED,
         new FinalSavingTransition(
@@ -488,6 +505,21 @@ public class RMAppImpl implements RMApp, Recoverable {
   public void setQueue(String queue) {
     this.queue = queue;
   }
+  
+  @Override
+  public String getAggregatorAddr() {
+    return this.aggregatorAddr;
+  }
+  
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    this.aggregatorAddr = aggregatorAddr;
+  }
+  
+  @Override
+  public void removeAggregatorAddr() {
+    this.aggregatorAddr = null;
+  }
 
   @Override
   public String getName() {
@@ -737,6 +769,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.diagnostics.append(appState.getDiagnostics());
     this.storedFinishTime = appState.getFinishTime();
     this.startTime = appState.getStartTime();
+    //TODO recover aggregator address.
+    //this.aggregatorAddr = appState.getAggregatorAddr();
 
     for(int i=0; i<appState.getAttemptCount(); ++i) {
       // create attempt
@@ -778,9 +812,24 @@ public class RMAppImpl implements RMApp, Recoverable {
       SingleArcTransition<RMAppImpl, RMAppEvent> {
     public void transition(RMAppImpl app, RMAppEvent event) {
     };
-
   }
 
+  private static final class RMAppAggregatorUpdateTransition 
+      extends RMAppTransition {
+  
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      LOG.info("Updating aggregator info for app: " + app.getApplicationId());
+    
+      RMAppAggregatorUpdateEvent appAggregatorUpdateEvent = 
+          (RMAppAggregatorUpdateEvent) event;
+      // Update aggregator address
+      app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
+      
+      // TODO persistent to RMStateStore for recover
+      // Save to RMStateStore
+    };
+  }
+  
   private static final class RMAppNodeUpdateTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index f8d92aa..0d0895a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -92,6 +92,18 @@ public abstract class MockAsm extends MockApps {
       throw new UnsupportedOperationException("Not supported yet.");
     }
     @Override
+    public String getAggregatorAddr() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
+    public void setAggregatorAddr(String aggregatorAddr) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
+    public void removeAggregatorAddr() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+    @Override
     public ApplicationId getApplicationId() {
       throw new UnsupportedOperationException("Not supported yet.");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ec990f9..96952d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -271,4 +271,19 @@ public class MockRMApp implements RMApp {
   public ResourceRequest getAMResourceRequest() {
     return this.amReq; 
   }
+
+  @Override
+  public String getAggregatorAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+  
+  @Override
+  public void removeAggregatorAddr() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
index cdc4e35..19920fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
@@ -94,10 +94,9 @@ public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
    * @return whether it was added successfully
    */
   public boolean addApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
     AppLevelTimelineAggregator aggregator =
-        new AppLevelTimelineAggregator(appIdString);
-    return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
+        new AppLevelTimelineAggregator(appId.toString());
+    return (aggregatorCollection.putIfAbsent(appId, aggregator)
         == aggregator);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
index 73b6d52..d6e2a18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,9 +32,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -62,6 +70,12 @@ public class TimelineAggregatorsCollection extends CompositeService {
 
   // REST server for this aggregator collection
   private HttpServer2 timelineRestServer;
+  
+  private String timelineRestServerBindAddress;
+  
+  private AggregatorNodemanagerProtocol nmAggregatorService;
+  
+  private InetSocketAddress nmAggregatorServiceAddress;
 
   static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
 
@@ -74,6 +88,16 @@ public class TimelineAggregatorsCollection extends CompositeService {
   }
 
   @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.nmAggregatorServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+    
+  }
+  
+  @Override
   protected void serviceStart() throws Exception {
     startWebApp();
     super.serviceStart();
@@ -95,9 +119,13 @@ public class TimelineAggregatorsCollection extends CompositeService {
    * starting the app level service
    * @return the aggregator associated with id after the potential put.
    */
-  public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
+  public TimelineAggregator putIfAbsent(ApplicationId appId, 
+      TimelineAggregator aggregator) {
+    String id = appId.toString();
+    TimelineAggregator aggregatorInTable;
+    boolean aggregatorIsNew = false;
     synchronized (aggregators) {
-      TimelineAggregator aggregatorInTable = aggregators.get(id);
+      aggregatorInTable = aggregators.get(id);
       if (aggregatorInTable == null) {
         try {
           // initialize, start, and add it to the collection so it can be
@@ -106,16 +134,30 @@ public class TimelineAggregatorsCollection extends CompositeService {
           aggregator.start();
           aggregators.put(id, aggregator);
           LOG.info("the aggregator for " + id + " was added");
-          return aggregator;
+          aggregatorInTable = aggregator;
+          aggregatorIsNew = true;
         } catch (Exception e) {
           throw new YarnRuntimeException(e);
         }
       } else {
         String msg = "the aggregator for " + id + " already exists!";
         LOG.error(msg);
-        return aggregatorInTable;
+      }
+      
+    }
+    // Report to NM if a new aggregator is added.
+    if (aggregatorIsNew) {
+      try {
+        reportNewAggregatorToNM(appId);
+      } catch (Exception e) {
+        // throw exception here as it cannot be used if failed report to NM
+        LOG.error("Failed to report a new aggregator for application: " + appId + 
+            " to NM Aggregator Services.");
+        throw new YarnRuntimeException(e);
       }
     }
+    
+    return aggregatorInTable;
   }
 
   /**
@@ -167,7 +209,10 @@ public class TimelineAggregatorsCollection extends CompositeService {
     String bindAddress = WebAppUtils.getWebAppBindURL(conf,
         YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
         WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
-    LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+        NetUtils.createSocketAddr(bindAddress));
+    LOG.info("Instantiating the per-node aggregator webapp at " + 
+        timelineRestServerBindAddress);
     try {
       Configuration confForInfoServer = new Configuration(conf);
       confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
@@ -200,4 +245,27 @@ public class TimelineAggregatorsCollection extends CompositeService {
       throw new YarnRuntimeException(msg, e);
     }
   }
+  
+  private void reportNewAggregatorToNM(ApplicationId appId) 
+      throws YarnException, IOException {
+    this.nmAggregatorService = getNMAggregatorService();
+    ReportNewAggregatorsInfoRequest request = 
+        ReportNewAggregatorsInfoRequest.newInstance(appId,
+            this.timelineRestServerBindAddress);
+    LOG.info("Report a new aggregator for application: " + appId + 
+        " to NM Aggregator Services.");
+    nmAggregatorService.reportNewAggregatorInfo(request);
+  }
+  
+  // protected for test
+  protected AggregatorNodemanagerProtocol getNMAggregatorService(){
+    Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+    
+    // TODO Security settings.
+    return (AggregatorNodemanagerProtocol) rpc.getProxy(
+        AggregatorNodemanagerProtocol.class,
+        nmAggregatorServiceAddress, conf);
+  }
+  
 }