You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/02/11 17:56:27 UTC
[3/4] hadoop git commit: pre-commit
pre-commit
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3006e23e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3006e23e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3006e23e
Branch: refs/heads/yarn-2877
Commit: 3006e23ebf893985ccbff35d23203b55fcdf66e4
Parents: fd6a6da
Author: Arun Suresh <as...@apache.org>
Authored: Thu Feb 11 00:51:16 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Feb 11 00:51:16 2016 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 42 ++
.../yarn/api/records/impl/pb/ProtoUtils.java | 13 +
.../hadoop-yarn-server-common/pom.xml | 1 +
.../api/DistributedSchedulerProtocol.java | 77 ++++
.../api/DistributedSchedulerProtocolPB.java | 36 ++
.../hadoop/yarn/server/api/ServerRMProxy.java | 4 +
...istributedSchedulerProtocolPBClientImpl.java | 151 +++++++
...stributedSchedulerProtocolPBServiceImpl.java | 143 ++++++
.../DistSchedAllocateResponse.java | 58 +++
.../DistSchedRegisterResponse.java | 102 +++++
.../pb/DistSchedAllocateResponsePBImpl.java | 180 ++++++++
.../pb/DistSchedRegisterResponsePBImpl.java | 304 +++++++++++++
.../proto/distributed_scheduler_protocol.proto | 38 ++
.../yarn_server_common_service_protos.proto | 15 +
.../hadoop/yarn/server/nodemanager/Context.java | 5 +
.../yarn/server/nodemanager/NodeManager.java | 36 +-
.../nodemanager/amrmproxy/AMRMProxyService.java | 8 +
.../amrmproxy/AbstractRequestInterceptor.java | 42 ++
.../amrmproxy/DefaultRequestInterceptor.java | 88 +++-
.../amrmproxy/RequestInterceptor.java | 3 +-
.../nodemanager/scheduler/LocalScheduler.java | 438 +++++++++++++++++++
.../OpportunisticContainerAllocator.java | 184 ++++++++
.../security/NMTokenSecretManagerInNM.java | 24 +-
.../yarn/server/nodemanager/TestEventFlow.java | 2 +-
.../nodemanager/TestNodeStatusUpdater.java | 4 +-
.../amrmproxy/BaseAMRMProxyTest.java | 10 +
.../BaseContainerManagerTest.java | 2 +-
.../TestContainerManagerRecovery.java | 2 +-
.../launcher/TestContainerLaunch.java | 2 +-
.../TestLocalCacheDirectoryManager.java | 3 +-
.../TestResourceLocalizationService.java | 4 +-
.../scheduler/TestLocalScheduler.java | 212 +++++++++
.../webapp/TestContainerLogsPage.java | 6 +-
.../nodemanager/webapp/TestNMWebServer.java | 4 +-
.../nodemanager/webapp/TestNMWebServices.java | 2 +-
.../webapp/TestNMWebServicesApps.java | 2 +-
.../webapp/TestNMWebServicesContainers.java | 2 +-
.../ApplicationMasterService.java | 39 +-
.../DistributedSchedulingService.java | 162 +++++++
.../server/resourcemanager/ResourceManager.java | 10 +
.../scheduler/AppSchedulingInfo.java | 3 +-
.../yarn/server/resourcemanager/MockRM.java | 17 +
.../TestDistributedSchedulingService.java | 170 +++++++
43 files changed, 2609 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d84c155..9b54675 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -291,6 +291,48 @@ public class YarnConfiguration extends Configuration {
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
+ /** Is Distributed Scheduling Enabled */
+ public static String DIST_SCHEDULING_ENABLED =
+ YARN_PREFIX + "distributed-scheduling.enabled";
+ public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false;
+
+ /** Mininum allocatable container memory for Distributed Scheduling */
+ public static String DIST_SCHEDULING_MIN_MEMORY =
+ YARN_PREFIX + "distributed-scheduling.min-memory";
+ public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512;
+
+ /** Mininum allocatable container vcores for Distributed Scheduling */
+ public static String DIST_SCHEDULING_MIN_VCORES =
+ YARN_PREFIX + "distributed-scheduling.min-vcores";
+ public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1;
+
+ /** Maximum allocatable container memory for Distributed Scheduling */
+ public static String DIST_SCHEDULING_MAX_MEMORY =
+ YARN_PREFIX + "distributed-scheduling.max-memory";
+ public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048;
+
+ /** Maximum allocatable container vcores for Distributed Scheduling */
+ public static String DIST_SCHEDULING_MAX_VCORES =
+ YARN_PREFIX + "distributed-scheduling.max-vcores";
+ public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4;
+
+ /** Incremental allocatable container memory for Distributed Scheduling */
+ public static String DIST_SCHEDULING_INCR_MEMORY =
+ YARN_PREFIX + "distributed-scheduling.incr-memory";
+ public static final int DIST_SCHEDULING_INCR_MEMORY_DEFAULT = 512;
+
+ /** Incremental allocatable container vcores for Distributed Scheduling */
+ public static String DIST_SCHEDULING_INCR_VCORES =
+ YARN_PREFIX + "distributed-scheduling.incr-vcores";
+ public static final int DIST_SCHEDULING_INCR_VCORES_DEFAULT = 1;
+
+ /** Container token expiry for container allocated via
+ * Distributed Scheduling */
+ public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS =
+ YARN_PREFIX + "distributed-scheduling.container-token-expiry";
+ public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
+ 600000;
+
/**
* Enable/disable intermediate-data encryption at YARN level. For now, this
* only is used by the FileSystemRMStateStore to setup right file-system
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 29ed0f3..ccdfc64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -37,8 +37,10 @@ import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
@@ -294,4 +296,15 @@ public class ProtoUtils {
public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
return ExecutionType.valueOf(e.name());
}
+
+ /*
+ * Resource
+ */
+ public static synchronized YarnProtos.ResourceProto convertToProtoFormat(Resource r) {
+ return ((ResourcePBImpl) r).getProto();
+ }
+
+ public static Resource convertFromProtoFormat(YarnProtos.ResourceProto resource) {
+ return new ResourcePBImpl(resource);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 2958b81..3a6bddd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -142,6 +142,7 @@
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
+ <include>distributed_scheduler_protocol.proto</include>
<include>yarn_server_common_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>
<include>yarn_server_common_service_protos.proto</include>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
new file mode 100644
index 0000000..47de8cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocol.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * <p>This protocol extends the <code>ApplicationMasterProtocol</code>. It is
+ * used by the <code>LocalScheduler</code> running on the NodeManager to wrap
+ * the request / response objects of the <code>registerApplicationMaster</code>
+ * and <code>allocate</code> methods of the protocol with addition information
+ * required to perform Distributed Scheduling.
+ * </p>
+ */
+public interface DistributedSchedulerProtocol extends ApplicationMasterProtocol {
+
+ /**
+ * <p> Extends the <code>registerApplicationMaster</code> to wrap the response
+ * with additional metadata</p>
+ *
+ * @param request ApplicationMaster registration request
+ * @return A <code>DistSchedRegisterResponse</code> that contains a standard
+ * AM registration response along with additional information required
+ * for Distributed Scheduling
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ @Idempotent
+ DistSchedRegisterResponse registerApplicationMasterForDistributedScheduling(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException;
+
+ /**
+ * <p> Extends the <code>allocate</code> to wrap the response with additional
+ * metadata</p>
+ *
+ * @param request ApplicationMaster allocate request
+ * @return A <code>DistSchedAllocateResponse</code> that contains a standard
+ * AM allocate response along with additional information required
+ * for Distributed Scheduling
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Public
+ @Unstable
+ @Idempotent
+ DistSchedAllocateResponse allocateForDistributedScheduling(
+ AllocateRequest request) throws YarnException, IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
new file mode 100644
index 0000000..413b9c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/DistributedSchedulerProtocolPB.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
+
+import org.apache.hadoop.yarn.proto.DistributedSchedulerProtocol;
+
+@Private
+@Unstable
+@ProtocolInfo(protocolName = "org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB",
+ protocolVersion = 1)
+public interface DistributedSchedulerProtocolPB extends
+ DistributedSchedulerProtocol.DistributedSchedulerProtocolService.BlockingInterface,
+ ApplicationMasterProtocolService.BlockingInterface {
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index 2d4085f..e3ac924 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -78,6 +78,10 @@ public class ServerRMProxy<T> extends RMProxy<T> {
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
+ } else if (protocol == DistributedSchedulerProtocol.class ) {
+ return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
} else {
String message = "Unsupported protocol found when creating the proxy " +
"connection to ResourceManager: " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
new file mode 100644
index 0000000..c1dd9e5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/DistributedSchedulerProtocolPBClientImpl.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class DistributedSchedulerProtocolPBClientImpl implements
+ DistributedSchedulerProtocol, Closeable {
+
+ private DistributedSchedulerProtocolPB proxy;
+
+ public DistributedSchedulerProtocolPBClientImpl(long clientVersion,
+ InetSocketAddress addr,
+ Configuration conf) throws IOException {
+ RPC.setProtocolEngine(conf, DistributedSchedulerProtocolPB.class,
+ ProtobufRpcEngine.class);
+ proxy = RPC.getProxy(DistributedSchedulerProtocolPB.class, clientVersion,
+ addr, conf);
+ }
+
+ @Override
+ public void close() {
+ if (this.proxy != null) {
+ RPC.stopProxy(this.proxy);
+ }
+ }
+
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
+ ((RegisterApplicationMasterRequestPBImpl) request).getProto();
+ try {
+ return new DistSchedRegisterResponsePBImpl(
+ proxy.registerApplicationMasterForDistributedScheduling(
+ null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ YarnServiceProtos.AllocateRequestProto requestProto =
+ ((AllocateRequestPBImpl) request).getProto();
+ try {
+ return new DistSchedAllocateResponsePBImpl(
+ proxy.allocateForDistributedScheduling(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ YarnServiceProtos.RegisterApplicationMasterRequestProto requestProto =
+ ((RegisterApplicationMasterRequestPBImpl) request).getProto();
+ try {
+ return new RegisterApplicationMasterResponsePBImpl(
+ proxy.registerApplicationMaster(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster
+ (FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ YarnServiceProtos.FinishApplicationMasterRequestProto requestProto =
+ ((FinishApplicationMasterRequestPBImpl) request).getProto();
+ try {
+ return new FinishApplicationMasterResponsePBImpl(
+ proxy.finishApplicationMaster(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request) throws
+ YarnException, IOException {
+ YarnServiceProtos.AllocateRequestProto requestProto =
+ ((AllocateRequestPBImpl) request).getProto();
+ try {
+ return new AllocateResponsePBImpl(proxy.allocate(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
new file mode 100644
index 0000000..8be2893
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/DistributedSchedulerProtocolPBServiceImpl.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedAllocateResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistSchedRegisterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .FinishApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .FinishApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
+
+import java.io.IOException;
+
+public class DistributedSchedulerProtocolPBServiceImpl implements
+ DistributedSchedulerProtocolPB {
+
+ private DistributedSchedulerProtocol real;
+
+ public DistributedSchedulerProtocolPBServiceImpl(
+ DistributedSchedulerProtocol impl) {
+ this.real = impl;
+ }
+
+ @Override
+ public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto
+ registerApplicationMasterForDistributedScheduling(RpcController controller,
+ RegisterApplicationMasterRequestProto proto) throws
+ ServiceException {
+ RegisterApplicationMasterRequestPBImpl request = new
+ RegisterApplicationMasterRequestPBImpl(proto);
+ try {
+ DistSchedRegisterResponse response =
+ real.registerApplicationMasterForDistributedScheduling(request);
+ return ((DistSchedRegisterResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto
+ allocateForDistributedScheduling(RpcController controller,
+ AllocateRequestProto proto) throws ServiceException {
+ AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+ try {
+ DistSchedAllocateResponse response = real
+ .allocateForDistributedScheduling(request);
+ return ((DistSchedAllocateResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public YarnServiceProtos.AllocateResponseProto allocate(RpcController arg0,
+ AllocateRequestProto proto) throws ServiceException {
+ AllocateRequestPBImpl request = new AllocateRequestPBImpl(proto);
+ try {
+ AllocateResponse response = real.allocate(request);
+ return ((AllocateResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public YarnServiceProtos.FinishApplicationMasterResponseProto
+ finishApplicationMaster(
+ RpcController arg0, YarnServiceProtos
+ .FinishApplicationMasterRequestProto proto)
+ throws ServiceException {
+ FinishApplicationMasterRequestPBImpl request = new
+ FinishApplicationMasterRequestPBImpl(proto);
+ try {
+ FinishApplicationMasterResponse response = real.finishApplicationMaster
+ (request);
+ return ((FinishApplicationMasterResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public YarnServiceProtos.RegisterApplicationMasterResponseProto
+ registerApplicationMaster(
+ RpcController arg0, RegisterApplicationMasterRequestProto proto)
+ throws ServiceException {
+ RegisterApplicationMasterRequestPBImpl request = new
+ RegisterApplicationMasterRequestPBImpl(proto);
+ try {
+ RegisterApplicationMasterResponse response = real
+ .registerApplicationMaster(request);
+ return ((RegisterApplicationMasterResponsePBImpl) response).getProto();
+ } catch (YarnException e) {
+ throw new ServiceException(e);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
new file mode 100644
index 0000000..5f6e069
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedAllocateResponse.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistSchedAllocateResponse {
+
+ @Public
+ @Unstable
+ public static DistSchedAllocateResponse newInstance(AllocateResponse
+ allResp) {
+ DistSchedAllocateResponse response =
+ Records.newRecord(DistSchedAllocateResponse.class);
+ response.setAllocateResponse(allResp);
+ return response;
+ }
+
+ @Public
+ @Unstable
+ public abstract void setAllocateResponse(AllocateResponse response);
+
+ @Public
+ @Unstable
+ public abstract AllocateResponse getAllocateResponse();
+
+ @Public
+ @Unstable
+ public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+
+ @Public
+ @Unstable
+ public abstract List<NodeId> getNodesForScheduling();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
new file mode 100644
index 0000000..e4e5138
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/DistSchedRegisterResponse.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+@Public
+@Unstable
+public abstract class DistSchedRegisterResponse {
+
+ @Public
+ @Unstable
+ public static DistSchedRegisterResponse newInstance
+ (RegisterApplicationMasterResponse regAMResp) {
+ DistSchedRegisterResponse response =
+ Records.newRecord(DistSchedRegisterResponse.class);
+ response.setRegisterResponse(regAMResp);
+ return response;
+ }
+
+ @Public
+ @Unstable
+ public abstract void setRegisterResponse(
+ RegisterApplicationMasterResponse resp);
+
+ @Public
+ @Unstable
+ public abstract RegisterApplicationMasterResponse getRegisterResponse();
+
+ @Public
+ @Unstable
+ public abstract void setMinAllocatableCapabilty(Resource minResource);
+
+ @Public
+ @Unstable
+ public abstract Resource getMinAllocatableCapabilty();
+
+ @Public
+ @Unstable
+ public abstract void setMaxAllocatableCapabilty(Resource maxResource);
+
+ @Public
+ @Unstable
+ public abstract Resource getMaxAllocatableCapabilty();
+
+ @Public
+ @Unstable
+ public abstract void setIncrAllocatableCapabilty(Resource maxResource);
+
+ @Public
+ @Unstable
+ public abstract Resource getIncrAllocatableCapabilty();
+
+ @Public
+ @Unstable
+ public abstract void setContainerTokenExpiryInterval(int interval);
+
+ @Public
+ @Unstable
+ public abstract int getContainerTokenExpiryInterval();
+
+ @Public
+ @Unstable
+ public abstract void setContainerIdStart(long containerIdStart);
+
+ @Public
+ @Unstable
+ public abstract long getContainerIdStart();
+
+ @Public
+ @Unstable
+ public abstract void setNodesForScheduling(List<NodeId> nodesForScheduling);
+
+ @Public
+ @Unstable
+ public abstract List<NodeId> getNodesForScheduling();
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
new file mode 100644
index 0000000..3ea4965
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedAllocateResponsePBImpl.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .AllocateResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .DistSchedAllocateResponse;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedAllocateResponsePBImpl extends DistSchedAllocateResponse {
+
+ YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto =
+ YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.getDefaultInstance();
+ YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private AllocateResponse allocateResponse;
+ private List<NodeId> nodesForScheduling;
+
+ public DistSchedAllocateResponsePBImpl() {
+ builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder();
+ }
+
+ public DistSchedAllocateResponsePBImpl(YarnServerCommonServiceProtos.DistSchedAllocateResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public YarnServerCommonServiceProtos.DistSchedAllocateResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = YarnServerCommonServiceProtos.DistSchedAllocateResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.nodesForScheduling != null) {
+ builder.clearNodesForScheduling();
+ Iterable<YarnProtos.NodeIdProto> iterable =
+ getNodeIdProtoIterable(this.nodesForScheduling);
+ builder.addAllNodesForScheduling(iterable);
+ }
+ if (this.allocateResponse != null) {
+ builder.setAllocateResponse(
+ ((AllocateResponsePBImpl)this.allocateResponse).getProto());
+ }
+ }
+ @Override
+ public void setAllocateResponse(AllocateResponse response) {
+ maybeInitBuilder();
+ if(allocateResponse == null) {
+ builder.clearAllocateResponse();
+ }
+ this.allocateResponse = response;
+ }
+
+ @Override
+ public AllocateResponse getAllocateResponse() {
+ if (this.allocateResponse != null) {
+ return this.allocateResponse;
+ }
+
+ YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ if (!p.hasAllocateResponse()) {
+ return null;
+ }
+
+ this.allocateResponse =
+ new AllocateResponsePBImpl(p.getAllocateResponse());
+ return this.allocateResponse;
+ }
+
+ @Override
+ public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+ maybeInitBuilder();
+ if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+ if (this.nodesForScheduling != null) {
+ this.nodesForScheduling.clear();
+ }
+ builder.clearNodesForScheduling();
+ return;
+ }
+ this.nodesForScheduling = new ArrayList<>();
+ this.nodesForScheduling.addAll(nodesForScheduling);
+ }
+
+ @Override
+ public List<NodeId> getNodesForScheduling() {
+ if (nodesForScheduling != null) {
+ return nodesForScheduling;
+ }
+ initLocalNodesForSchedulingList();
+ return nodesForScheduling;
+ }
+
+ private synchronized void initLocalNodesForSchedulingList() {
+ YarnServerCommonServiceProtos.DistSchedAllocateResponseProtoOrBuilder p =
+ viaProto ? proto : builder;
+ List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+ nodesForScheduling = new ArrayList<>();
+ if (list != null) {
+ for (YarnProtos.NodeIdProto t : list) {
+ nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ }
+ }
+ }
+ private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+ final List<NodeId> nodeList) {
+ maybeInitBuilder();
+ return new Iterable<YarnProtos.NodeIdProto>() {
+ @Override
+ public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+ return new Iterator<YarnProtos.NodeIdProto>() {
+
+ Iterator<NodeId> iter = nodeList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public YarnProtos.NodeIdProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
new file mode 100644
index 0000000..0322c70
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/DistSchedRegisterResponsePBImpl.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import com.google.protobuf.TextFormat;
+
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
+ .RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .DistSchedRegisterResponse;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class DistSchedRegisterResponsePBImpl extends DistSchedRegisterResponse {
+
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto =
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.getDefaultInstance();
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private Resource maxAllocatableCapability;
+ private Resource minAllocatableCapability;
+ private Resource incrAllocatableCapability;
+ private List<NodeId> nodesForScheduling;
+ private RegisterApplicationMasterResponse registerApplicationMasterResponse;
+
+ public DistSchedRegisterResponsePBImpl() {
+ builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder();
+ }
+
+ public DistSchedRegisterResponsePBImpl(YarnServerCommonServiceProtos.DistSchedRegisterResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public YarnServerCommonServiceProtos.DistSchedRegisterResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = YarnServerCommonServiceProtos.DistSchedRegisterResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (this.nodesForScheduling != null) {
+ builder.clearNodesForScheduling();
+ Iterable<YarnProtos.NodeIdProto> iterable =
+ getNodeIdProtoIterable(this.nodesForScheduling);
+ builder.addAllNodesForScheduling(iterable);
+ }
+ if (this.maxAllocatableCapability != null) {
+ builder.setMaxAllocCapability(
+ ProtoUtils.convertToProtoFormat(this.maxAllocatableCapability));
+ }
+ if (this.minAllocatableCapability != null) {
+ builder.setMaxAllocCapability(
+ ProtoUtils.convertToProtoFormat(this.minAllocatableCapability));
+ }
+ if (this.registerApplicationMasterResponse != null) {
+ builder.setRegisterResponse(
+ ((RegisterApplicationMasterResponsePBImpl)
+ this.registerApplicationMasterResponse).getProto());
+ }
+ }
+
+ @Override
+ public void setRegisterResponse(RegisterApplicationMasterResponse resp) {
+ maybeInitBuilder();
+ if(registerApplicationMasterResponse == null) {
+ builder.clearRegisterResponse();
+ }
+ this.registerApplicationMasterResponse = resp;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse getRegisterResponse() {
+ if (this.registerApplicationMasterResponse != null) {
+ return this.registerApplicationMasterResponse;
+ }
+
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasRegisterResponse()) {
+ return null;
+ }
+
+ this.registerApplicationMasterResponse =
+ new RegisterApplicationMasterResponsePBImpl(p.getRegisterResponse());
+ return this.registerApplicationMasterResponse;
+ }
+
+ @Override
+ public void setMaxAllocatableCapabilty(Resource maxResource) {
+ maybeInitBuilder();
+ if(maxAllocatableCapability == null) {
+ builder.clearMaxAllocCapability();
+ }
+ this.maxAllocatableCapability = maxResource;
+ }
+
+ @Override
+ public Resource getMaxAllocatableCapabilty() {
+ if (this.maxAllocatableCapability != null) {
+ return this.maxAllocatableCapability;
+ }
+
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasMaxAllocCapability()) {
+ return null;
+ }
+
+ this.maxAllocatableCapability =
+ ProtoUtils.convertFromProtoFormat(p.getMaxAllocCapability());
+ return this.maxAllocatableCapability;
+ }
+
+ @Override
+ public void setMinAllocatableCapabilty(Resource minResource) {
+ maybeInitBuilder();
+ if(minAllocatableCapability == null) {
+ builder.clearMinAllocCapability();
+ }
+ this.minAllocatableCapability = minResource;
+ }
+
+ @Override
+ public Resource getMinAllocatableCapabilty() {
+ if (this.minAllocatableCapability != null) {
+ return this.minAllocatableCapability;
+ }
+
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasMinAllocCapability()) {
+ return null;
+ }
+
+ this.minAllocatableCapability =
+ ProtoUtils.convertFromProtoFormat(p.getMinAllocCapability());
+ return this.minAllocatableCapability;
+ }
+
+ @Override
+ public void setIncrAllocatableCapabilty(Resource incrResource) {
+ maybeInitBuilder();
+ if(incrAllocatableCapability == null) {
+ builder.clearIncrAllocCapability();
+ }
+ this.incrAllocatableCapability = incrResource;
+ }
+
+ @Override
+ public Resource getIncrAllocatableCapabilty() {
+ if (this.incrAllocatableCapability != null) {
+ return this.incrAllocatableCapability;
+ }
+
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasIncrAllocCapability()) {
+ return null;
+ }
+
+ this.incrAllocatableCapability =
+ ProtoUtils.convertFromProtoFormat(p.getIncrAllocCapability());
+ return this.incrAllocatableCapability;
+ }
+
+ @Override
+ public void setContainerTokenExpiryInterval(int interval) {
+ maybeInitBuilder();
+ builder.setContainerTokenExpiryInterval(interval);
+ }
+
+ @Override
+ public int getContainerTokenExpiryInterval() {
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasContainerTokenExpiryInterval()) {
+ return 0;
+ }
+ return p.getContainerTokenExpiryInterval();
+ }
+
+ @Override
+ public void setContainerIdStart(long containerIdStart) {
+ maybeInitBuilder();
+ builder.setContainerIdStart(containerIdStart);
+ }
+
+ @Override
+ public long getContainerIdStart() {
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasContainerIdStart()) {
+ return 0;
+ }
+ return p.getContainerIdStart();
+ }
+
+
+ @Override
+ public void setNodesForScheduling(List<NodeId> nodesForScheduling) {
+ maybeInitBuilder();
+ if (nodesForScheduling == null || nodesForScheduling.isEmpty()) {
+ if (this.nodesForScheduling != null) {
+ this.nodesForScheduling.clear();
+ }
+ builder.clearNodesForScheduling();
+ return;
+ }
+ this.nodesForScheduling = new ArrayList<>();
+ this.nodesForScheduling.addAll(nodesForScheduling);
+ }
+
+ @Override
+ public List<NodeId> getNodesForScheduling() {
+ if (nodesForScheduling != null) {
+ return nodesForScheduling;
+ }
+ initLocalNodesForSchedulingList();
+ return nodesForScheduling;
+ }
+
+ private synchronized void initLocalNodesForSchedulingList() {
+ YarnServerCommonServiceProtos.DistSchedRegisterResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<YarnProtos.NodeIdProto> list = p.getNodesForSchedulingList();
+ nodesForScheduling = new ArrayList<>();
+ if (list != null) {
+ for (YarnProtos.NodeIdProto t : list) {
+ nodesForScheduling.add(ProtoUtils.convertFromProtoFormat(t));
+ }
+ }
+ }
+ private synchronized Iterable<YarnProtos.NodeIdProto> getNodeIdProtoIterable(
+ final List<NodeId> nodeList) {
+ maybeInitBuilder();
+ return new Iterable<YarnProtos.NodeIdProto>() {
+ @Override
+ public synchronized Iterator<YarnProtos.NodeIdProto> iterator() {
+ return new Iterator<YarnProtos.NodeIdProto>() {
+
+ Iterator<NodeId> iter = nodeList.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public YarnProtos.NodeIdProto next() {
+ return ProtoUtils.convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
new file mode 100644
index 0000000..7e3a77f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/distributed_scheduler_protocol.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are public and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "DistributedSchedulerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_service_protos.proto";
+import "yarn_server_common_service_protos.proto";
+
+
+service DistributedSchedulerProtocolService {
+ rpc registerApplicationMasterForDistributedScheduling (RegisterApplicationMasterRequestProto) returns (DistSchedRegisterResponseProto);
+ rpc allocateForDistributedScheduling (AllocateRequestProto) returns (DistSchedAllocateResponseProto);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a54bbdb..786d8ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -26,6 +26,21 @@ import "yarn_protos.proto";
import "yarn_server_common_protos.proto";
import "yarn_service_protos.proto";
+message DistSchedRegisterResponseProto {
+ optional RegisterApplicationMasterResponseProto register_response = 1;
+ optional ResourceProto max_alloc_capability = 2;
+ optional ResourceProto min_alloc_capability = 3;
+ optional ResourceProto incr_alloc_capability = 4;
+ optional int32 container_token_expiry_interval = 5;
+ optional int64 container_id_start = 6;
+ repeated NodeIdProto nodes_for_scheduling = 7;
+}
+
+message DistSchedAllocateResponseProto {
+ optional AllocateResponseProto allocate_response = 1;
+ repeated NodeIdProto nodes_for_scheduling = 2;
+}
+
message NodeLabelsProto {
repeated NodeLabelProto nodeLabels = 1;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 9c2d1fb..e0a4da4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -87,4 +88,8 @@ public interface Context {
ConcurrentLinkedQueue<LogAggregationReport>
getLogAggregationStatusForApps();
+
+ boolean isDistributedSchedulingEnabled();
+
+ OpportunisticContainerAllocator getContainerAllocator();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a9a5411..ef7b760 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ScriptBasedNodeLabel
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -187,9 +188,9 @@ public class NodeManager extends CompositeService
protected NMContext createNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
- NMStateStoreService stateStore) {
+ NMStateStoreService stateStore, boolean isDistSchedulerEnabled) {
return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
- dirsHandler, aclsManager, stateStore);
+ dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled);
}
protected void doSecureLogin() throws IOException {
@@ -310,8 +311,12 @@ public class NodeManager extends CompositeService
getNodeHealthScriptRunner(conf), dirsHandler);
addService(nodeHealthChecker);
+ boolean isDistSchedulingEnabled =
+ conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
+ YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
+
this.context = createNMContext(containerTokenSecretManager,
- nmTokenSecretManager, nmStore);
+ nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
nodeLabelsProvider = createNodeLabelsProvider(conf);
@@ -340,6 +345,10 @@ public class NodeManager extends CompositeService
addService(webServer);
((NMContext) context).setWebServer(webServer);
+ ((NMContext) context).setQueueableContainerAllocator(
+ new OpportunisticContainerAllocator(nodeStatusUpdater, context,
+ webServer.getPort()));
+
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
@@ -458,11 +467,14 @@ public class NodeManager extends CompositeService
private boolean isDecommissioned = false;
private final ConcurrentLinkedQueue<LogAggregationReport>
logAggregationReportForApps;
+ private final boolean isDistSchedulingEnabled;
+
+ private OpportunisticContainerAllocator containerAllocator;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
- NMStateStoreService stateStore) {
+ NMStateStoreService stateStore, boolean isDistSchedulingEnabled) {
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.dirsHandler = dirsHandler;
@@ -473,6 +485,7 @@ public class NodeManager extends CompositeService
this.stateStore = stateStore;
this.logAggregationReportForApps = new ConcurrentLinkedQueue<
LogAggregationReport>();
+ this.isDistSchedulingEnabled = isDistSchedulingEnabled;
}
/**
@@ -585,6 +598,21 @@ public class NodeManager extends CompositeService
getLogAggregationStatusForApps() {
return this.logAggregationReportForApps;
}
+
+ @Override
+ public boolean isDistributedSchedulingEnabled() {
+ return isDistSchedulingEnabled;
+ }
+
+ public void setQueueableContainerAllocator(
+ OpportunisticContainerAllocator containerAllocator) {
+ this.containerAllocator = containerAllocator;
+ }
+
+ @Override
+ public OpportunisticContainerAllocator getContainerAllocator() {
+ return containerAllocator;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index bd6538c..67bb52b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+
+import org.apache.hadoop.yarn.server.nodemanager.scheduler.LocalScheduler;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
@@ -464,6 +466,12 @@ public class AMRMProxyService extends AbstractService implements
interceptorClassNames.add(item.trim());
}
+ // Make sure LocalScheduler is present at the beginning
+ // of the chain..
+ if (this.nmContext.isDistributedSchedulingEnabled()) {
+ interceptorClassNames.add(0, LocalScheduler.class.getName());
+ }
+
return interceptorClassNames;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
index 810dfa8..2cf185f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
@@ -21,6 +21,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configuration;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords
+ .RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
+
+import java.io.IOException;
/**
* Implements the RequestInterceptor interface and provides common functionality
@@ -99,4 +107,38 @@ public abstract class AbstractRequestInterceptor implements
public AMRMProxyApplicationContext getApplicationContext() {
return this.appContext;
}
+
+ /**
+ * Default implementation that invokes the distributed scheduling version
+ * of the register method
+ *
+ * @param request ApplicationMaster allocate request
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ return (this.nextInterceptor != null) ?
+ this.nextInterceptor.allocateForDistributedScheduling(request) : null;
+ }
+
+ /**
+ * Default implementation that invokes the distributed scheduling version
+ * of the allocate method
+ *
+ * @param request ApplicationMaster registration request
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ @Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return (this.nextInterceptor != null) ? this.nextInterceptor
+ .registerApplicationMasterForDistributedScheduling(request) : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index 2c7939b..5e10d03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import com.google.common.base.Joiner;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -33,9 +38,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
+import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords
+ .DistSchedAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedRegisterResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +60,7 @@ public final class DefaultRequestInterceptor extends
AbstractRequestInterceptor {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultRequestInterceptor.class);
- private ApplicationMasterProtocol rmClient;
+ private DistributedSchedulerProtocol rmClient;
private UserGroupInformation user = null;
@Override
@@ -63,11 +75,12 @@ public final class DefaultRequestInterceptor extends
final Configuration conf = this.getConf();
rmClient =
- user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+ user.doAs(new PrivilegedExceptionAction<DistributedSchedulerProtocol>() {
@Override
- public ApplicationMasterProtocol run() throws Exception {
- return ClientRMProxy.createRMProxy(conf,
- ApplicationMasterProtocol.class);
+ public DistributedSchedulerProtocol run() throws Exception {
+ setAMRMTokenService(conf);
+ return ServerRMProxy.createRMProxy(conf,
+ DistributedSchedulerProtocol.class);
}
});
} catch (IOException e) {
@@ -108,6 +121,32 @@ public final class DefaultRequestInterceptor extends
}
@Override
+ public DistSchedRegisterResponse
+ registerApplicationMasterForDistributedScheduling
+ (RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
+ "request to the real YARN RM");
+ return rmClient.registerApplicationMasterForDistributedScheduling(request);
+ }
+
+ @Override
+ public DistSchedAllocateResponse allocateForDistributedScheduling
+ (AllocateRequest request) throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Forwarding allocateForDistributedScheduling request" +
+ "to the real YARN RM");
+ }
+ DistSchedAllocateResponse allocateResponse =
+ rmClient.allocateForDistributedScheduling(request);
+ if (allocateResponse.getAllocateResponse().getAMRMToken() != null) {
+ updateAMRMToken(allocateResponse.getAllocateResponse().getAMRMToken());
+ }
+
+ return allocateResponse;
+ }
+
+ @Override
public FinishApplicationMasterResponse finishApplicationMaster(
final FinishApplicationMasterRequest request) throws YarnException,
IOException {
@@ -135,4 +174,43 @@ public final class DefaultRequestInterceptor extends
user.addToken(amrmToken);
amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
}
+
+ private static void setAMRMTokenService(final Configuration conf)
+ throws IOException {
+ for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation
+ .getCurrentUser().getTokens()) {
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ token.setService(getAMRMTokenService(conf));
+ }
+ }
+ }
+
+ @InterfaceStability.Unstable
+ public static Text getAMRMTokenService(Configuration conf) {
+ return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+ }
+
+ @InterfaceStability.Unstable
+ public static Text getTokenService(Configuration conf, String address,
+ String defaultAddr, int defaultPort) {
+ if (HAUtil.isHAEnabled(conf)) {
+ // Build a list of service addresses to form the service name
+ ArrayList<String> services = new ArrayList<String>();
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ for (String rmId : HAUtil.getRMHAIds(conf)) {
+ // Set RM_ID to get the corresponding RM_ADDRESS
+ yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
+ services.add(SecurityUtil.buildTokenService(
+ yarnConf.getSocketAddr(address, defaultAddr, defaultPort))
+ .toString());
+ }
+ return new Text(Joiner.on(',').join(services));
+ }
+
+ // Non-HA case - no need to set RM_ID
+ return SecurityUtil.buildTokenService(conf.getSocketAddr(address,
+ defaultAddr, defaultPort));
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3006e23e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
index c74c88f..8ec2b4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
@@ -20,13 +20,14 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
/**
* Defines the contract to be implemented by the request intercepter classes,
* that can be used to intercept and inspect messages sent from the application
* master to the resource manager.
*/
-public interface RequestInterceptor extends ApplicationMasterProtocol,
+public interface RequestInterceptor extends DistributedSchedulerProtocol,
Configurable {
/**
* This method is called for initializing the intercepter. This is guaranteed