You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/18 04:34:41 UTC
[49/50] hadoop git commit: YARN-3039. Implemented the app-level
timeline aggregator discovery service. Contributed by Junping Du.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
new file mode 100644
index 0000000..eb7beef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoRequestPBImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppAggregatorsMapPBImpl;
+
+public class ReportNewAggregatorsInfoRequestPBImpl extends
+ ReportNewAggregatorsInfoRequest {
+
+ ReportNewAggregatorsInfoRequestProto proto =
+ ReportNewAggregatorsInfoRequestProto.getDefaultInstance();
+
+ ReportNewAggregatorsInfoRequestProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List<AppAggregatorsMap> aggregatorsList = null;
+
+ public ReportNewAggregatorsInfoRequestPBImpl() {
+ builder = ReportNewAggregatorsInfoRequestProto.newBuilder();
+ }
+
+ public ReportNewAggregatorsInfoRequestPBImpl(
+ ReportNewAggregatorsInfoRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReportNewAggregatorsInfoRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (aggregatorsList != null) {
+ addLocalAggregatorsToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ReportNewAggregatorsInfoRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void addLocalAggregatorsToProto() {
+ maybeInitBuilder();
+ builder.clearAppAggregators();
+ List<AppAggregatorsMapProto> protoList =
+ new ArrayList<AppAggregatorsMapProto>();
+ for (AppAggregatorsMap m : this.aggregatorsList) {
+ protoList.add(convertToProtoFormat(m));
+ }
+ builder.addAllAppAggregators(protoList);
+ }
+
+ private void initLocalAggregatorsList() {
+ ReportNewAggregatorsInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<AppAggregatorsMapProto> aggregatorsList =
+ p.getAppAggregatorsList();
+ this.aggregatorsList = new ArrayList<AppAggregatorsMap>();
+ for (AppAggregatorsMapProto m : aggregatorsList) {
+ this.aggregatorsList.add(convertFromProtoFormat(m));
+ }
+ }
+
+ @Override
+ public List<AppAggregatorsMap> getAppAggregatorsList() {
+ if (this.aggregatorsList == null) {
+ initLocalAggregatorsList();
+ }
+ return this.aggregatorsList;
+ }
+
+ @Override
+ public void setAppAggregatorsList(List<AppAggregatorsMap> appAggregatorsList) {
+ maybeInitBuilder();
+ if (appAggregatorsList == null) {
+ builder.clearAppAggregators();
+ }
+ this.aggregatorsList = appAggregatorsList;
+ }
+
+ private AppAggregatorsMapPBImpl convertFromProtoFormat(
+ AppAggregatorsMapProto p) {
+ return new AppAggregatorsMapPBImpl(p);
+ }
+
+ private AppAggregatorsMapProto convertToProtoFormat(
+ AppAggregatorsMap m) {
+ return ((AppAggregatorsMapPBImpl) m).getProto();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
new file mode 100644
index 0000000..0f0925a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewAggregatorsInfoResponsePBImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ReportNewAggregatorsInfoResponsePBImpl extends
+ ReportNewAggregatorsInfoResponse {
+
+ ReportNewAggregatorsInfoResponseProto proto =
+ ReportNewAggregatorsInfoResponseProto.getDefaultInstance();
+
+ ReportNewAggregatorsInfoResponseProto.Builder builder = null;
+
+ boolean viaProto = false;
+
+ public ReportNewAggregatorsInfoResponsePBImpl() {
+ builder = ReportNewAggregatorsInfoResponseProto.newBuilder();
+ }
+
+ public ReportNewAggregatorsInfoResponsePBImpl(ReportNewAggregatorsInfoResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ReportNewAggregatorsInfoResponseProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
new file mode 100644
index 0000000..67c377d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppAggregatorsMap.java
@@ -0,0 +1,33 @@
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+public abstract class AppAggregatorsMap {
+
+ public static AppAggregatorsMap newInstance(
+ ApplicationId id, String aggregatorAddr) {
+ AppAggregatorsMap appAggregatorMap =
+ Records.newRecord(AppAggregatorsMap.class);
+ appAggregatorMap.setApplicationId(id);
+ appAggregatorMap.setAggregatorAddr(aggregatorAddr);
+ return appAggregatorMap;
+ }
+
+ public abstract ApplicationId getApplicationId();
+
+ public abstract void setApplicationId(
+ ApplicationId id);
+
+ public abstract String getAggregatorAddr();
+
+ public abstract void setAggregatorAddr(
+ String addr);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
new file mode 100644
index 0000000..32903e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppAggregatorsMapPBImpl.java
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AppAggregatorsMapPBImpl extends AppAggregatorsMap {
+
+ AppAggregatorsMapProto proto =
+ AppAggregatorsMapProto.getDefaultInstance();
+
+ AppAggregatorsMapProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ApplicationId appId = null;
+ private String aggregatorAddr = null;
+
+ public AppAggregatorsMapPBImpl() {
+ builder = AppAggregatorsMapProto.newBuilder();
+ }
+
+ public AppAggregatorsMapPBImpl(AppAggregatorsMapProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public AppAggregatorsMapProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public ApplicationId getApplicationId() {
+ AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.appId == null && p.hasAppId()) {
+ this.appId = convertFromProtoFormat(p.getAppId());
+ }
+ return this.appId;
+ }
+
+ @Override
+ public String getAggregatorAddr() {
+ AppAggregatorsMapProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.aggregatorAddr == null
+ && p.hasAppAggregatorAddr()) {
+ this.aggregatorAddr = p.getAppAggregatorAddr();
+ }
+ return this.aggregatorAddr;
+ }
+
+ @Override
+ public void setApplicationId(ApplicationId appId) {
+ maybeInitBuilder();
+ if (appId == null) {
+ builder.clearAppId();
+ }
+ this.appId = appId;
+ }
+
+ @Override
+ public void setAggregatorAddr(String aggregatorAddr) {
+ maybeInitBuilder();
+ if (aggregatorAddr == null) {
+ builder.clearAppAggregatorAddr();
+ }
+ this.aggregatorAddr = aggregatorAddr;
+ }
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl) t).getProto();
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = AppAggregatorsMapProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.appId != null) {
+ builder.setAppId(convertToProtoFormat(this.appId));
+ }
+ if (this.aggregatorAddr != null) {
+ builder.setAppAggregatorAddr(this.aggregatorAddr);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
new file mode 100644
index 0000000..d7b05c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/aggregatornodemanager_protocol.proto
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "AggregatorNodemanagerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service AggregatorNodemanagerProtocolService {
+ rpc reportNewAggregatorInfo (ReportNewAggregatorsInfoRequestProto) returns (ReportNewAggregatorsInfoResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 91473c5..3b03f58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -47,6 +47,7 @@ message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
+ repeated AppAggregatorsMapProto registered_aggregators = 4;
}
message NodeHeartbeatResponseProto {
@@ -60,6 +61,7 @@ message NodeHeartbeatResponseProto {
optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+ repeated AppAggregatorsMapProto app_aggregators_map = 11;
}
message SystemCredentialsForAppsProto {
@@ -67,6 +69,25 @@ message SystemCredentialsForAppsProto {
optional bytes credentialsForApp = 2;
}
+////////////////////////////////////////////////////////////////////////
+////// From aggregator_nodemanager_protocol ////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message AppAggregatorsMapProto {
+ optional ApplicationIdProto appId = 1;
+ optional string appAggregatorAddr = 2;
+}
+
+//////////////////////////////////////////////////////
+/////// aggregator_nodemanager_protocol //////////////
+//////////////////////////////////////////////////////
+message ReportNewAggregatorsInfoRequestProto {
+ repeated AppAggregatorsMapProto app_aggregators = 1;
+}
+
+message ReportNewAggregatorsInfoResponseProto {
+}
+
+
message NMContainerStatusProto {
optional ContainerIdProto container_id = 1;
optional ContainerStateProto container_state = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
new file mode 100644
index 0000000..af9d60f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -0,0 +1,345 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRPC {
+
+ private static final String EXCEPTION_MSG = "test error";
+ private static final String EXCEPTION_CAUSE = "exception cause";
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ public static final String ILLEGAL_NUMBER_MESSAGE =
+ "aggregators' number in ReportNewAggregatorsInfoRequest is not ONE.";
+
+ public static final String DEFAULT_AGGREGATOR_ADDR = "localhost:0";
+
+ public static final ApplicationId DEFAULT_APP_ID =
+ ApplicationId.newInstance(0, 0);
+
+ @Test
+ public void testUnknownCall() {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+ .getName());
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ Server server = rpc.getServer(ContainerManagementProtocol.class,
+ new DummyContainerManager(), addr, conf, null, 1);
+ server.start();
+
+ // Any unrelated protocol would do
+ ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
+
+ try {
+ proxy.getNewApplication(Records
+ .newRecord(GetNewApplicationRequest.class));
+ Assert.fail("Excepted RPC call to fail with unknown method.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().matches(
+ "Unknown method getNewApplication called on.*"
+ + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+ + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testRPCOnAggregatorNodeManagerProtocol() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+ .getName());
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ Server server = rpc.getServer(AggregatorNodemanagerProtocol.class,
+ new DummyNMAggregatorService(), addr, conf, null, 1);
+ server.start();
+
+ // Test unrelated protocol wouldn't get response
+ ApplicationClientProtocol unknownProxy = (ApplicationClientProtocol) rpc.getProxy(
+ ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
+
+ try {
+ unknownProxy.getNewApplication(Records
+ .newRecord(GetNewApplicationRequest.class));
+ Assert.fail("Excepted RPC call to fail with unknown method.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().matches(
+ "Unknown method getNewApplication called on.*"
+ + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+ + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ // Test AggregatorNodemanagerProtocol get proper response
+ AggregatorNodemanagerProtocol proxy = (AggregatorNodemanagerProtocol)rpc.getProxy(
+ AggregatorNodemanagerProtocol.class, NetUtils.getConnectAddress(server), conf);
+ // Verify request with DEFAULT_APP_ID and DEFAULT_AGGREGATOR_ADDR get
+ // normally response.
+ try {
+ ReportNewAggregatorsInfoRequest request =
+ ReportNewAggregatorsInfoRequest.newInstance(
+ DEFAULT_APP_ID, DEFAULT_AGGREGATOR_ADDR);
+ proxy.reportNewAggregatorInfo(request);
+ } catch (YarnException e) {
+ Assert.fail("RPC call failured is not expected here.");
+ }
+
+ // Verify empty request get YarnException back (by design in
+ // DummyNMAggregatorService)
+ try {
+ proxy.reportNewAggregatorInfo(Records
+ .newRecord(ReportNewAggregatorsInfoRequest.class));
+ Assert.fail("Excepted RPC call to fail with YarnException.");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
+ }
+
+ server.stop();
+ }
+
+ @Test
+ public void testHadoopProtoRPC() throws Exception {
+ test(HadoopYarnProtoRPC.class.getName());
+ }
+
+ private void test(String rpcClass) throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
+ YarnRPC rpc = YarnRPC.create(conf);
+ String bindAddr = "localhost:0";
+ InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+ Server server = rpc.getServer(ContainerManagementProtocol.class,
+ new DummyContainerManager(), addr, conf, null, 1);
+ server.start();
+ RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
+ ContainerManagementProtocol proxy = (ContainerManagementProtocol)
+ rpc.getProxy(ContainerManagementProtocol.class,
+ NetUtils.getConnectAddress(server), conf);
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ ApplicationId applicationId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, 0);
+ ContainerId containerId =
+ ContainerId.newContainerId(applicationAttemptId, 100);
+ NodeId nodeId = NodeId.newInstance("localhost", 1234);
+ Resource resource = Resource.newInstance(1234, 2);
+ ContainerTokenIdentifier containerTokenIdentifier =
+ new ContainerTokenIdentifier(containerId, "localhost", "user",
+ resource, System.currentTimeMillis() + 10000, 42, 42,
+ Priority.newInstance(0), 0);
+ Token containerToken = newContainerToken(nodeId, "password".getBytes(),
+ containerTokenIdentifier);
+
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(containerLaunchContext,
+ containerToken);
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ proxy.startContainers(allRequests);
+
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
+ containerIds.add(containerId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ GetContainerStatusesResponse response =
+ proxy.getContainerStatuses(gcsRequest);
+ List<ContainerStatus> statuses = response.getContainerStatuses();
+
+ //test remote exception
+ boolean exception = false;
+ try {
+ StopContainersRequest stopRequest =
+ recordFactory.newRecordInstance(StopContainersRequest.class);
+ stopRequest.setContainerIds(containerIds);
+ proxy.stopContainers(stopRequest);
+ } catch (YarnException e) {
+ exception = true;
+ Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
+ Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
+ System.out.println("Test Exception is " + e.getMessage());
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ } finally {
+ server.stop();
+ }
+ Assert.assertTrue(exception);
+ Assert.assertNotNull(statuses.get(0));
+ Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
+ }
+
+ public class DummyContainerManager implements ContainerManagementProtocol {
+
+ private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+
+ @Override
+ public GetContainerStatusesResponse getContainerStatuses(
+ GetContainerStatusesRequest request)
+ throws YarnException {
+ GetContainerStatusesResponse response =
+ recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
+ response.setContainerStatuses(statuses);
+ return response;
+ }
+
+ @Override
+ public StartContainersResponse startContainers(
+ StartContainersRequest requests) throws YarnException {
+ StartContainersResponse response =
+ recordFactory.newRecordInstance(StartContainersResponse.class);
+ for (StartContainerRequest request : requests.getStartContainerRequests()) {
+ Token containerToken = request.getContainerToken();
+ ContainerTokenIdentifier tokenId = null;
+
+ try {
+ tokenId = newContainerTokenIdentifier(containerToken);
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
+ ContainerStatus status =
+ recordFactory.newRecordInstance(ContainerStatus.class);
+ status.setState(ContainerState.RUNNING);
+ status.setContainerId(tokenId.getContainerID());
+ status.setExitStatus(0);
+ statuses.add(status);
+
+ }
+ return response;
+ }
+
+ @Override
+ public StopContainersResponse stopContainers(StopContainersRequest request)
+ throws YarnException {
+ Exception e = new Exception(EXCEPTION_MSG,
+ new Exception(EXCEPTION_CAUSE));
+ throw new YarnException(e);
+ }
+ }
+
+ public static ContainerTokenIdentifier newContainerTokenIdentifier(
+ Token containerToken) throws IOException {
+ org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+ new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>(
+ containerToken.getIdentifier()
+ .array(), containerToken.getPassword().array(), new Text(
+ containerToken.getKind()),
+ new Text(containerToken.getService()));
+ return token.decodeIdentifier();
+ }
+
+ public static Token newContainerToken(NodeId nodeId, byte[] password,
+ ContainerTokenIdentifier tokenIdentifier) {
+ // RPC layer client expects ip:port as service for tokens
+ InetSocketAddress addr =
+ NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+ // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+ Token containerToken =
+ Token.newInstance(tokenIdentifier.getBytes(),
+ ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+ .buildTokenService(addr).toString());
+ return containerToken;
+ }
+
+ // A dummy implementation for AggregatorNodemanagerProtocol for test purpose,
+ // it only can accept one appID, aggregatorAddr pair or throw exceptions
+ public class DummyNMAggregatorService
+ implements AggregatorNodemanagerProtocol {
+
+ @Override
+ public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+ ReportNewAggregatorsInfoRequest request)
+ throws YarnException, IOException {
+ List<AppAggregatorsMap> appAggregators = request.getAppAggregatorsList();
+ if (appAggregators.size() == 1) {
+ // check default appID and aggregatorAddr
+ AppAggregatorsMap appAggregator = appAggregators.get(0);
+ Assert.assertEquals(appAggregator.getApplicationId(),
+ DEFAULT_APP_ID);
+ Assert.assertEquals(appAggregator.getAggregatorAddr(),
+ DEFAULT_AGGREGATOR_ADDR);
+ } else {
+ throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
+ }
+
+ ReportNewAggregatorsInfoResponse response =
+ recordFactory.newRecordInstance(ReportNewAggregatorsInfoResponse.class);
+ return response;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 20983b6..47cf8ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -89,11 +91,14 @@ public class TestYarnServerApiClasses {
original.setLastKnownContainerTokenMasterKey(getMasterKey());
original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus());
+ Map<ApplicationId, String> aggregators = getAggregators();
+ original.setRegisteredAggregators(aggregators);
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
+ assertEquals(aggregators, copy.getRegisteredAggregators());
}
/**
@@ -110,6 +115,8 @@ public class TestYarnServerApiClasses {
original.setNextHeartBeatInterval(1000);
original.setNodeAction(NodeAction.NORMAL);
original.setResponseId(100);
+ Map<ApplicationId, String> aggregators = getAggregators();
+ original.setAppAggregatorsMap(aggregators);
NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
original.getProto());
@@ -119,6 +126,7 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+ assertEquals(aggregators, copy.getAppAggregatorsMap());
}
/**
@@ -208,6 +216,15 @@ public class TestYarnServerApiClasses {
}
+ private Map<ApplicationId, String> getAggregators() {
+ ApplicationId appID = ApplicationId.newInstance(1L, 1);
+ String aggregatorAddr = "localhost:0";
+ Map<ApplicationId, String> aggregatorMap =
+ new HashMap<ApplicationId, String>();
+ aggregatorMap.put(appID, aggregatorAddr);
+ return aggregatorMap;
+ }
+
private ContainerStatus getContainerStatus(int applicationId,
int containerID, int appAttemptId) {
ContainerStatus status = recordFactory
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 6e7e2ec..85f3f0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -57,6 +57,19 @@ public interface Context {
ConcurrentMap<ApplicationId, Application> getApplications();
Map<ApplicationId, Credentials> getSystemCredentialsForApps();
+
+ /**
+ * Get the registered aggregators that located on this NM.
+ * @return registered
+ */
+ Map<ApplicationId, String> getRegisteredAggregators();
+
+ /**
+ * Return the known aggregators which get from RM for all active applications
+ * running on this NM.
+ * @return known aggregators.
+ */
+ Map<ApplicationId, String> getKnownAggregators();
ConcurrentMap<ContainerId, Container> getContainers();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a4be120..10143db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.aggregatormanager.NMAggregatorService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -84,8 +85,9 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
+ private NMAggregatorService nmAggregatorService;
private NodeStatusUpdater nodeStatusUpdater;
- private static CompositeServiceShutdownHook nodeManagerShutdownHook;
+ private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private NMStateStoreService nmStore = null;
private AtomicBoolean isStopping = new AtomicBoolean(false);
@@ -112,6 +114,10 @@ public class NodeManager extends CompositeService
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, dirsHandler);
}
+
+ protected NMAggregatorService createNMAggregatorService(Context context) {
+ return new NMAggregatorService(context);
+ }
protected WebServer createWebServer(Context nmContext,
ResourceView resourceView, ApplicationACLsManager aclsManager,
@@ -268,6 +274,9 @@ public class NodeManager extends CompositeService
addService(dispatcher);
DefaultMetricsSystem.initialize("NodeManager");
+
+ this.nmAggregatorService = createNMAggregatorService(context);
+ addService(nmAggregatorService);
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
@@ -345,6 +354,12 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
+
+ protected Map<ApplicationId, String> registeredAggregators =
+ new ConcurrentHashMap<ApplicationId, String>();
+
+ protected Map<ApplicationId, String> knownAggregators =
+ new ConcurrentHashMap<ApplicationId, String>();
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
@@ -460,6 +475,30 @@ public class NodeManager extends CompositeService
Map<ApplicationId, Credentials> systemCredentials) {
this.systemCredentials = systemCredentials;
}
+
+ @Override
+ public Map<ApplicationId, String> getRegisteredAggregators() {
+ return this.registeredAggregators;
+ }
+
+ public void addRegisteredAggregators(
+ Map<ApplicationId, String> newRegisteredAggregators) {
+ this.registeredAggregators.putAll(newRegisteredAggregators);
+ // Update to knownAggregators as well so it can immediately be consumed by
+ // this NM's TimelineClient.
+ this.knownAggregators.putAll(newRegisteredAggregators);
+ }
+
+ @Override
+ public Map<ApplicationId, String> getKnownAggregators() {
+ return this.knownAggregators;
+ }
+
+ public void addKnownAggregators(
+ Map<ApplicationId, String> knownAggregators) {
+ this.knownAggregators.putAll(knownAggregators);
+ }
+
}
@@ -523,6 +562,11 @@ public class NodeManager extends CompositeService
public Context getNMContext() {
return this.context;
}
+
+ // For testing
+ NMAggregatorService getNMAggregatorService() {
+ return this.nmAggregatorService;
+ }
public static void main(String[] args) throws IOException {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 6ddd7e4..c855833 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -592,7 +592,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
- .getCurrentKey());
+ .getCurrentKey(),
+ NodeStatusUpdaterImpl.this.context.getRegisteredAggregators());
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -655,6 +656,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
((NMContext) context)
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
}
+
+ Map<ApplicationId, String> knownAggregators = response.getAppAggregatorsMap();
+ ((NodeManager.NMContext)context).addKnownAggregators(knownAggregators);
+
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
new file mode 100644
index 0000000..17150ba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/aggregatormanager/NMAggregatorService.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.aggregatormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+
+public class NMAggregatorService extends CompositeService implements
+ AggregatorNodemanagerProtocol {
+
+ private static final Log LOG = LogFactory.getLog(NMAggregatorService.class);
+
+ final Context context;
+
+ private Server server;
+
+ public NMAggregatorService(Context context) {
+
+ super(NMAggregatorService.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ Configuration conf = getConfig();
+
+ InetSocketAddress aggregatorServerAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+
+ Configuration serverConf = new Configuration(conf);
+
+ // TODO Security settings.
+ YarnRPC rpc = YarnRPC.create(conf);
+
+ server =
+ rpc.getServer(AggregatorNodemanagerProtocol.class, this,
+ aggregatorServerAddress, serverConf,
+ this.context.getNMTokenSecretManager(),
+ conf.getInt(YarnConfiguration.NM_AGGREGATOR_SERVICE_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT));
+
+ server.start();
+ // start remaining services
+ super.serviceStart();
+ LOG.info("NMAggregatorService started at " + aggregatorServerAddress);
+ }
+
+
+ @Override
+ public void serviceStop() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ // TODO may cleanup app aggregators running on this NM in future.
+ super.serviceStop();
+ }
+
+ @Override
+ public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+ ReportNewAggregatorsInfoRequest request) throws IOException {
+ List<AppAggregatorsMap> newAggregatorsList = request.getAppAggregatorsList();
+ if (newAggregatorsList != null && !newAggregatorsList.isEmpty()) {
+ Map<ApplicationId, String> newAggregatorsMap =
+ new HashMap<ApplicationId, String>();
+ for (AppAggregatorsMap aggregator : newAggregatorsList) {
+ newAggregatorsMap.put(aggregator.getApplicationId(), aggregator.getAggregatorAddr());
+ }
+ ((NodeManager.NMContext)context).addRegisteredAggregators(newAggregatorsMap);
+ }
+
+ return ReportNewAggregatorsInfoResponse.newInstance();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index a73b113..6bf3bbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -425,6 +425,10 @@ public class ApplicationImpl implements Application {
new LogHandlerAppFinishedEvent(app.appId));
app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+ // Remove aggregator info for finished apps.
+ // TODO check we remove related aggregators info in failure cases (YARN-3038)
+ app.context.getRegisteredAggregators().remove(app.getAppId());
+ app.context.getKnownAggregators().remove(app.getAppId());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 1c7f987..2eb1a7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -344,6 +344,8 @@ public class ApplicationMasterService extends AbstractService implements
RMApp rmApp =
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+ // Remove aggregator address when app get finished.
+ rmApp.removeAggregatorAddr();
// checking whether the app exits in RMStateStore at first not to throw
// ApplicationDoesNotExistInCacheException before and after
// RM work-preserving restart.
@@ -576,6 +578,10 @@ public class ApplicationMasterService extends AbstractService implements
allocateResponse.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+
+ // add aggregator address for this application
+ allocateResponse.setAggregatorAddr(
+ this.rmContext.getRMApps().get(applicationId).getAggregatorAddr());
// add preemption to the allocateResponse message (if any)
allocateResponse
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 0de556b..f163a28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppAggregatorUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -406,6 +410,11 @@ public class ResourceTrackerService extends AbstractService implements
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return resync;
}
+
+ // Check & update aggregators info from request.
+ // TODO make sure it won't have race condition issue for AM failed over case
+ // that the older registration could possible override the newer one.
+ updateAppAggregatorsMap(request);
// Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
@@ -421,15 +430,72 @@ public class ResourceTrackerService extends AbstractService implements
if (!systemCredentials.isEmpty()) {
nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
}
+
+ // Return aggregators' map that NM needs to know
+ // TODO we should optimize this to only include aggreator info that NM
+ // doesn't know yet.
+ List<ApplicationId> keepAliveApps = remoteNodeStatus.getKeepAliveApplications();
+ if (keepAliveApps != null) {
+ setAppAggregatorsMapToResponse(keepAliveApps, nodeHeartBeatResponse);
+ }
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(),
- remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+ keepAliveApps, nodeHeartBeatResponse));
return nodeHeartBeatResponse;
}
+
+ private void setAppAggregatorsMapToResponse(
+ List<ApplicationId> liveApps, NodeHeartbeatResponse response) {
+ Map<ApplicationId, String> liveAppAggregatorsMap = new
+ ConcurrentHashMap<ApplicationId, String>();
+ Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
+ for (ApplicationId appId : liveApps) {
+ String appAggregatorAddr = rmApps.get(appId).getAggregatorAddr();
+ if (appAggregatorAddr != null) {
+ liveAppAggregatorsMap.put(appId, appAggregatorAddr);
+ } else {
+ // Log a debug info if aggregator address is not found.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Aggregator for applicaton: " + appId + " hasn't registered yet!");
+ }
+ }
+ }
+ response.setAppAggregatorsMap(liveAppAggregatorsMap);
+ }
+
+ private void updateAppAggregatorsMap(NodeHeartbeatRequest request) {
+ Map<ApplicationId, String> registeredAggregatorsMap =
+ request.getRegisteredAggregators();
+ if (registeredAggregatorsMap != null
+ && !registeredAggregatorsMap.isEmpty()) {
+ Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
+ for (Map.Entry<ApplicationId, String> entry:
+ registeredAggregatorsMap.entrySet()) {
+ ApplicationId appId = entry.getKey();
+ String aggregatorAddr = entry.getValue();
+ if (aggregatorAddr != null && !aggregatorAddr.isEmpty()) {
+ RMApp rmApp = rmApps.get(appId);
+ if (rmApp == null) {
+ LOG.warn("Cannot update aggregator info because application ID: " +
+ appId + " is not found in RMContext!");
+ } else {
+ String previousAggregatorAddr = rmApp.getAggregatorAddr();
+ if (previousAggregatorAddr == null ||
+ previousAggregatorAddr != aggregatorAddr) {
+ // sending aggregator update event.
+ RMAppAggregatorUpdateEvent event =
+ new RMAppAggregatorUpdateEvent(appId, aggregatorAddr);
+ rmContext.getDispatcher().getEventHandler().handle(event);
+ }
+ }
+ }
+ }
+ }
+ }
private void populateKeys(NodeHeartbeatRequest request,
NodeHeartbeatResponse nodeHeartBeatResponse) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index fbcaab9..f81edb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -172,6 +172,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
* @return the tracking url for the application master.
*/
String getTrackingUrl();
+
+ /**
+ * The aggregator address for the application.
+ * @return the address for the application's aggregator.
+ */
+ String getAggregatorAddr();
+
+ /**
+ * Set aggregator address for the application
+ * @param aggregatorAddr the address of aggregator
+ */
+ void setAggregatorAddr(String aggregatorAddr);
+
+ /**
+ * Remove aggregator address when application is finished or killed.
+ */
+ void removeAggregatorAddr();
/**
* The original tracking url for the application master.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
new file mode 100644
index 0000000..b43de44
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppAggregatorUpdateEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppAggregatorUpdateEvent extends RMAppEvent {
+
+ private final String appAggregatorAddr;
+
+ public RMAppAggregatorUpdateEvent(ApplicationId appId, String appAggregatorAddr) {
+ super(appId, RMAppEventType.AGGREGATOR_UPDATE);
+ this.appAggregatorAddr = appAggregatorAddr;
+ }
+
+ public String getAppAggregatorAddr(){
+ return this.appAggregatorAddr;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
index 668c5e1..6e9460a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
@@ -30,6 +30,9 @@ public enum RMAppEventType {
// Source: Scheduler
APP_ACCEPTED,
+
+ // TODO add source later
+ AGGREGATOR_UPDATE,
// Source: RMAppAttempt
ATTEMPT_REGISTERED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 2d1737a..6a076ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -134,6 +134,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private long startTime;
private long finishTime = 0;
private long storedFinishTime = 0;
+ private String aggregatorAddr;
// This field isn't protected by readlock now.
private volatile RMAppAttempt currentAttempt;
private String queue;
@@ -165,6 +166,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.NEW, RMAppState.NEW,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
@@ -181,6 +184,8 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from NEW_SAVING state
.addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
.addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
@@ -199,6 +204,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(
@@ -215,6 +222,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.MOVE, new RMAppMoveTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
RMAppEventType.ATTEMPT_REGISTERED)
.addTransition(RMAppState.ACCEPTED,
@@ -241,6 +250,8 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.MOVE, new RMAppMoveTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_UNREGISTERED,
new FinalSavingTransition(
@@ -270,6 +281,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -281,6 +294,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -292,6 +307,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ RMAppEventType.AGGREGATOR_UPDATE, new RMAppAggregatorUpdateTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -488,6 +505,21 @@ public class RMAppImpl implements RMApp, Recoverable {
public void setQueue(String queue) {
this.queue = queue;
}
+
+ @Override
+ public String getAggregatorAddr() {
+ return this.aggregatorAddr;
+ }
+
+ @Override
+ public void setAggregatorAddr(String aggregatorAddr) {
+ this.aggregatorAddr = aggregatorAddr;
+ }
+
+ @Override
+ public void removeAggregatorAddr() {
+ this.aggregatorAddr = null;
+ }
@Override
public String getName() {
@@ -737,6 +769,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
+ //TODO recover aggregator address.
+ //this.aggregatorAddr = appState.getAggregatorAddr();
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
@@ -778,9 +812,24 @@ public class RMAppImpl implements RMApp, Recoverable {
SingleArcTransition<RMAppImpl, RMAppEvent> {
public void transition(RMAppImpl app, RMAppEvent event) {
};
-
}
+ private static final class RMAppAggregatorUpdateTransition
+ extends RMAppTransition {
+
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ LOG.info("Updating aggregator info for app: " + app.getApplicationId());
+
+ RMAppAggregatorUpdateEvent appAggregatorUpdateEvent =
+ (RMAppAggregatorUpdateEvent) event;
+ // Update aggregator address
+ app.setAggregatorAddr(appAggregatorUpdateEvent.getAppAggregatorAddr());
+
+ // TODO persistent to RMStateStore for recover
+ // Save to RMStateStore
+ };
+ }
+
private static final class RMAppNodeUpdateTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppNodeUpdateEvent nodeUpdateEvent = (RMAppNodeUpdateEvent) event;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index f8d92aa..0d0895a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -92,6 +92,18 @@ public abstract class MockAsm extends MockApps {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
+ public String getAggregatorAddr() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ @Override
+ public void setAggregatorAddr(String aggregatorAddr) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ @Override
+ public void removeAggregatorAddr() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ @Override
public ApplicationId getApplicationId() {
throw new UnsupportedOperationException("Not supported yet.");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index ec990f9..96952d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -271,4 +271,19 @@ public class MockRMApp implements RMApp {
public ResourceRequest getAMResourceRequest() {
return this.amReq;
}
+
+ @Override
+ public String getAggregatorAddr() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void removeAggregatorAddr() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void setAggregatorAddr(String aggregatorAddr) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
index cdc4e35..19920fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeTimelineAggregatorsAuxService.java
@@ -94,10 +94,9 @@ public class PerNodeTimelineAggregatorsAuxService extends AuxiliaryService {
* @return whether it was added successfully
*/
public boolean addApplication(ApplicationId appId) {
- String appIdString = appId.toString();
AppLevelTimelineAggregator aggregator =
- new AppLevelTimelineAggregator(appIdString);
- return (aggregatorCollection.putIfAbsent(appIdString, aggregator)
+ new AppLevelTimelineAggregator(appId.toString());
+ return (aggregatorCollection.putIfAbsent(appId, aggregator)
== aggregator);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
index 73b6d52..d6e2a18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregatorsCollection.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+import java.io.IOException;
import java.net.URI;
+import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -30,9 +32,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -62,6 +70,12 @@ public class TimelineAggregatorsCollection extends CompositeService {
// REST server for this aggregator collection
private HttpServer2 timelineRestServer;
+
+ private String timelineRestServerBindAddress;
+
+ private AggregatorNodemanagerProtocol nmAggregatorService;
+
+ private InetSocketAddress nmAggregatorServiceAddress;
static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
@@ -74,6 +88,16 @@ public class TimelineAggregatorsCollection extends CompositeService {
}
@Override
+ public void serviceInit(Configuration conf) throws Exception {
+ this.nmAggregatorServiceAddress = conf.getSocketAddr(
+ YarnConfiguration.NM_BIND_HOST,
+ YarnConfiguration.NM_AGGREGATOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS,
+ YarnConfiguration.DEFAULT_NM_AGGREGATOR_SERVICE_PORT);
+
+ }
+
+ @Override
protected void serviceStart() throws Exception {
startWebApp();
super.serviceStart();
@@ -95,9 +119,13 @@ public class TimelineAggregatorsCollection extends CompositeService {
* starting the app level service
* @return the aggregator associated with id after the potential put.
*/
- public TimelineAggregator putIfAbsent(String id, TimelineAggregator aggregator) {
+ public TimelineAggregator putIfAbsent(ApplicationId appId,
+ TimelineAggregator aggregator) {
+ String id = appId.toString();
+ TimelineAggregator aggregatorInTable;
+ boolean aggregatorIsNew = false;
synchronized (aggregators) {
- TimelineAggregator aggregatorInTable = aggregators.get(id);
+ aggregatorInTable = aggregators.get(id);
if (aggregatorInTable == null) {
try {
// initialize, start, and add it to the collection so it can be
@@ -106,16 +134,30 @@ public class TimelineAggregatorsCollection extends CompositeService {
aggregator.start();
aggregators.put(id, aggregator);
LOG.info("the aggregator for " + id + " was added");
- return aggregator;
+ aggregatorInTable = aggregator;
+ aggregatorIsNew = true;
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
} else {
String msg = "the aggregator for " + id + " already exists!";
LOG.error(msg);
- return aggregatorInTable;
+ }
+
+ }
+ // Report to NM if a new aggregator is added.
+ if (aggregatorIsNew) {
+ try {
+ reportNewAggregatorToNM(appId);
+ } catch (Exception e) {
+ // throw exception here as it cannot be used if failed report to NM
+ LOG.error("Failed to report a new aggregator for application: " + appId +
+ " to NM Aggregator Services.");
+ throw new YarnRuntimeException(e);
}
}
+
+ return aggregatorInTable;
}
/**
@@ -167,7 +209,10 @@ public class TimelineAggregatorsCollection extends CompositeService {
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
- LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+ this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+ NetUtils.createSocketAddr(bindAddress));
+ LOG.info("Instantiating the per-node aggregator webapp at " +
+ timelineRestServerBindAddress);
try {
Configuration confForInfoServer = new Configuration(conf);
confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
@@ -200,4 +245,27 @@ public class TimelineAggregatorsCollection extends CompositeService {
throw new YarnRuntimeException(msg, e);
}
}
+
+ private void reportNewAggregatorToNM(ApplicationId appId)
+ throws YarnException, IOException {
+ this.nmAggregatorService = getNMAggregatorService();
+ ReportNewAggregatorsInfoRequest request =
+ ReportNewAggregatorsInfoRequest.newInstance(appId,
+ this.timelineRestServerBindAddress);
+ LOG.info("Report a new aggregator for application: " + appId +
+ " to NM Aggregator Services.");
+ nmAggregatorService.reportNewAggregatorInfo(request);
+ }
+
+ // protected for test
+ protected AggregatorNodemanagerProtocol getNMAggregatorService(){
+ Configuration conf = getConfig();
+ final YarnRPC rpc = YarnRPC.create(conf);
+
+ // TODO Security settings.
+ return (AggregatorNodemanagerProtocol) rpc.getProxy(
+ AggregatorNodemanagerProtocol.class,
+ nmAggregatorServiceAddress, conf);
+ }
+
}