You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:48 UTC
[06/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move
TaskExecutor, JobMaster and ResourceManager out of the rpc package
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..896421b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
+ */
+public class SlotRequest implements Serializable {
+
+ private static final long serialVersionUID = -6586877187990445986L;
+
+ /** The JobID of the slot requested for */
+ private final JobID jobId;
+
+ /** The unique identification of this request */
+ private final AllocationID allocationId;
+
+ /** The resource profile of the required slot */
+ private final ResourceProfile resourceProfile;
+
+ public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
+ this.jobId = checkNotNull(jobId);
+ this.allocationId = checkNotNull(allocationId);
+ this.resourceProfile = checkNotNull(resourceProfile);
+ }
+
+ /**
+ * Get the JobID of the slot requested for.
+ * @return The job id
+ */
+ public JobID getJobId() {
+ return jobId;
+ }
+
+ /**
+ * Get the unique identification of this request
+ * @return the allocation id
+ */
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+
+ /**
+ * Get the resource profile of the desired slot
+ * @return The resource profile
+ */
+ public ResourceProfile getResourceProfile() {
+ return resourceProfile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
deleted file mode 100644
index a046cb8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.jobmaster;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-
-/**
- * JobMaster implementation. The job master is responsible for the execution of a single
- * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- * <p>
- * It offers the following methods as part of its rpc interface to interact with the JobMaster
- * remotely:
- * <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
- * given task</li>
- * </ul>
- */
-public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-
- /** Gateway to connected resource manager, null iff not connected */
- private ResourceManagerGateway resourceManager = null;
-
- /** Logical representation of the job */
- private final JobGraph jobGraph;
- private final JobID jobID;
-
- /** Configuration of the job */
- private final Configuration configuration;
-
- /** Service to contend for and retrieve the leadership of JM and RM */
- private final HighAvailabilityServices highAvailabilityServices;
-
- /** Leader Management */
- private LeaderElectionService leaderElectionService = null;
- private UUID leaderSessionID;
-
- /**
- * The JM's Constructor
- *
- * @param jobGraph The representation of the job's execution plan
- * @param configuration The job's configuration
- * @param rpcService The RPC service at which the JM serves
- * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders.
- */
- public JobMaster(
- JobGraph jobGraph,
- Configuration configuration,
- RpcService rpcService,
- HighAvailabilityServices highAvailabilityService) {
-
- super(rpcService);
-
- this.jobGraph = Preconditions.checkNotNull(jobGraph);
- this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
-
- this.configuration = Preconditions.checkNotNull(configuration);
-
- this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService);
- }
-
- public ResourceManagerGateway getResourceManager() {
- return resourceManager;
- }
-
- //----------------------------------------------------------------------------------------------
- // Initialization methods
- //----------------------------------------------------------------------------------------------
- public void start() {
- super.start();
-
- // register at the election once the JM starts
- registerAtElectionService();
- }
-
-
- //----------------------------------------------------------------------------------------------
- // JobMaster Leadership methods
- //----------------------------------------------------------------------------------------------
-
- /**
- * Retrieves the election service and contend for the leadership.
- */
- private void registerAtElectionService() {
- try {
- leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
- leaderElectionService.start(new JobMasterLeaderContender());
- } catch (Exception e) {
- throw new RuntimeException("Fail to register at the election of JobMaster", e);
- }
- }
-
- /**
- * Start the execution when the leadership is granted.
- *
- * @param newLeaderSessionID The identifier of the new leadership session
- */
- public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID);
-
- // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that
- // JM waits here for the operation's completeness.
- leaderSessionID = newLeaderSessionID;
- leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-
- // TODO:: execute the job when the leadership is granted.
- }
- });
- }
-
- /**
- * Stop the execution when the leadership is revoked.
- */
- public void revokeJobMasterLeadership() {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("JobManager {} was revoked leadership.", getAddress());
-
- // TODO:: cancel the job's execution and notify all listeners
- cancelAndClearEverything(new Exception("JobManager is no longer the leader."));
-
- leaderSessionID = null;
- }
- });
- }
-
- /**
- * Handles error occurring in the leader election service
- *
- * @param exception Exception thrown in the leader election service
- */
- public void onJobMasterElectionError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("Received an error from the LeaderElectionService.", exception);
-
- // TODO:: cancel the job's execution and shutdown the JM
- cancelAndClearEverything(exception);
-
- leaderSessionID = null;
- }
- });
-
- }
-
- //----------------------------------------------------------------------------------------------
- // RPC methods
- //----------------------------------------------------------------------------------------------
-
- /**
- * Updates the task execution state for a given task.
- *
- * @param taskExecutionState New task execution state for a given task
- * @return Acknowledge the task execution state update
- */
- @RpcMethod
- public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
- System.out.println("TaskExecutionState: " + taskExecutionState);
- return Acknowledge.get();
- }
-
- /**
- * Triggers the registration of the job master at the resource manager.
- *
- * @param address Address of the resource manager
- */
- @RpcMethod
- public void registerAtResourceManager(final String address) {
- //TODO:: register at the RM
- }
-
- //----------------------------------------------------------------------------------------------
- // Helper methods
- //----------------------------------------------------------------------------------------------
-
- /**
- * Cancel the current job and notify all listeners the job's cancellation.
- *
- * @param cause Cause for the cancelling.
- */
- private void cancelAndClearEverything(Throwable cause) {
- // currently, nothing to do here
- }
-
- // ------------------------------------------------------------------------
- // Utility classes
- // ------------------------------------------------------------------------
- private class JobMasterLeaderContender implements LeaderContender {
-
- @Override
- public void grantLeadership(UUID leaderSessionID) {
- JobMaster.this.grantJobMasterLeadership(leaderSessionID);
- }
-
- @Override
- public void revokeLeadership() {
- JobMaster.this.revokeJobMasterLeadership();
- }
-
- @Override
- public String getAddress() {
- return JobMaster.this.getAddress();
- }
-
- @Override
- public void handleError(Exception exception) {
- onJobMasterElectionError(exception);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
deleted file mode 100644
index 17a4c3a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.jobmaster;
-
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
-
-/**
- * {@link JobMaster} rpc gateway interface
- */
-public interface JobMasterGateway extends RpcGateway {
-
- /**
- * Updates the task execution state for a given task.
- *
- * @param taskExecutionState New task execution state for a given task
- * @return Future acknowledge of the task execution state update
- */
- Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
-
- /**
- * Triggers the registration of the job master at the resource manager.
- *
- * @param address Address of the resource manager
- */
- void registerAtResourceManager(final String address);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
deleted file mode 100644
index 2de560a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import java.io.Serializable;
-
-/**
- * Base class for responses given to registration attempts from {@link RetryingRegistration}.
- */
-public abstract class RegistrationResponse implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // ----------------------------------------------------------------------------
-
- /**
- * Base class for a successful registration. Concrete registration implementations
- * will typically extend this class to attach more information.
- */
- public static class Success extends RegistrationResponse {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String toString() {
- return "Registration Successful";
- }
- }
-
- // ----------------------------------------------------------------------------
-
- /**
- * A rejected (declined) registration.
- */
- public static final class Decline extends RegistrationResponse {
- private static final long serialVersionUID = 1L;
-
- /** the rejection reason */
- private final String reason;
-
- /**
- * Creates a new rejection message.
- *
- * @param reason The reason for the rejection.
- */
- public Decline(String reason) {
- this.reason = reason != null ? reason : "(unknown)";
- }
-
- /**
- * Gets the reason for the rejection.
- */
- public String getReason() {
- return reason;
- }
-
- @Override
- public String toString() {
- return "Registration Declined (" + reason + ')';
- }
- }
-}
-
-
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
deleted file mode 100644
index dcb5011..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-
-/**
- * This utility class implements the basis of registering one component at another component,
- * for example registering the TaskExecutor at the ResourceManager.
- * This {@code RetryingRegistration} implements both the initial address resolution
- * and the retries-with-backoff strategy.
- *
- * <p>The registration gives access to a future that is completed upon successful registration.
- * The registration can be canceled, for example when the target where it tries to register
- * at looses leader status.
- *
- * @param <Gateway> The type of the gateway to connect to.
- * @param <Success> The type of the successful registration responses.
- */
-public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
-
- // ------------------------------------------------------------------------
- // default configuration values
- // ------------------------------------------------------------------------
-
- /** default value for the initial registration timeout (milliseconds) */
- private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
-
- /** default value for the maximum registration timeout, after exponential back-off (milliseconds) */
- private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
-
- /** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout) */
- private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
-
- /** The pause (milliseconds) made after the registration attempt was refused */
- private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
-
- // ------------------------------------------------------------------------
- // Fields
- // ------------------------------------------------------------------------
-
- private final Logger log;
-
- private final RpcService rpcService;
-
- private final String targetName;
-
- private final Class<Gateway> targetType;
-
- private final String targetAddress;
-
- private final UUID leaderId;
-
- private final Promise<Tuple2<Gateway, Success>> completionPromise;
-
- private final long initialRegistrationTimeout;
-
- private final long maxRegistrationTimeout;
-
- private final long delayOnError;
-
- private final long delayOnRefusedRegistration;
-
- private volatile boolean canceled;
-
- // ------------------------------------------------------------------------
-
- public RetryingRegistration(
- Logger log,
- RpcService rpcService,
- String targetName,
- Class<Gateway> targetType,
- String targetAddress,
- UUID leaderId) {
- this(log, rpcService, targetName, targetType, targetAddress, leaderId,
- INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS,
- ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS);
- }
-
- public RetryingRegistration(
- Logger log,
- RpcService rpcService,
- String targetName,
- Class<Gateway> targetType,
- String targetAddress,
- UUID leaderId,
- long initialRegistrationTimeout,
- long maxRegistrationTimeout,
- long delayOnError,
- long delayOnRefusedRegistration) {
-
- checkArgument(initialRegistrationTimeout > 0, "initial registration timeout must be greater than zero");
- checkArgument(maxRegistrationTimeout > 0, "maximum registration timeout must be greater than zero");
- checkArgument(delayOnError >= 0, "delay on error must be non-negative");
- checkArgument(delayOnRefusedRegistration >= 0, "delay on refused registration must be non-negative");
-
- this.log = checkNotNull(log);
- this.rpcService = checkNotNull(rpcService);
- this.targetName = checkNotNull(targetName);
- this.targetType = checkNotNull(targetType);
- this.targetAddress = checkNotNull(targetAddress);
- this.leaderId = checkNotNull(leaderId);
- this.initialRegistrationTimeout = initialRegistrationTimeout;
- this.maxRegistrationTimeout = maxRegistrationTimeout;
- this.delayOnError = delayOnError;
- this.delayOnRefusedRegistration = delayOnRefusedRegistration;
-
- this.completionPromise = new DefaultPromise<>();
- }
-
- // ------------------------------------------------------------------------
- // completion and cancellation
- // ------------------------------------------------------------------------
-
- public Future<Tuple2<Gateway, Success>> getFuture() {
- return completionPromise.future();
- }
-
- /**
- * Cancels the registration procedure.
- */
- public void cancel() {
- canceled = true;
- }
-
- /**
- * Checks if the registration was canceled.
- * @return True if the registration was canceled, false otherwise.
- */
- public boolean isCanceled() {
- return canceled;
- }
-
- // ------------------------------------------------------------------------
- // registration
- // ------------------------------------------------------------------------
-
- protected abstract Future<RegistrationResponse> invokeRegistration(
- Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception;
-
- /**
- * This method resolves the target address to a callable gateway and starts the
- * registration after that.
- */
- @SuppressWarnings("unchecked")
- public void startRegistration() {
- try {
- // trigger resolution of the resource manager address to a callable gateway
- Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
-
- // upon success, start the registration attempts
- resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() {
- @Override
- public void onSuccess(Gateway result) {
- log.info("Resolved {} address, beginning registration", targetName);
- register(result, 1, initialRegistrationTimeout);
- }
- }, rpcService.getExecutionContext());
-
- // upon failure, retry, unless this is cancelled
- resourceManagerFuture.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) {
- if (!isCanceled()) {
- log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress);
- startRegistration();
- }
- }
- }, rpcService.getExecutionContext());
- }
- catch (Throwable t) {
- cancel();
- completionPromise.tryFailure(t);
- }
- }
-
- /**
- * This method performs a registration attempt and triggers either a success notification or a retry,
- * depending on the result.
- */
- @SuppressWarnings("unchecked")
- private void register(final Gateway gateway, final int attempt, final long timeoutMillis) {
- // eager check for canceling to avoid some unnecessary work
- if (canceled) {
- return;
- }
-
- try {
- log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
- Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
-
- // if the registration was successful, let the TaskExecutor know
- registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() {
-
- @Override
- public void onSuccess(RegistrationResponse result) throws Throwable {
- if (!isCanceled()) {
- if (result instanceof RegistrationResponse.Success) {
- // registration successful!
- Success success = (Success) result;
- completionPromise.success(new Tuple2<>(gateway, success));
- }
- else {
- // registration refused or unknown
- if (result instanceof RegistrationResponse.Decline) {
- RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
- log.info("Registration at {} was declined: {}", targetName, decline.getReason());
- } else {
- log.error("Received unknown response to registration attempt: " + result);
- }
-
- log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration);
- registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration);
- }
- }
- }
- }, rpcService.getExecutionContext());
-
- // upon failure, retry
- registrationFuture.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) {
- if (!isCanceled()) {
- if (failure instanceof TimeoutException) {
- // we simply have not received a response in time. maybe the timeout was
- // very low (initial fast registration attempts), maybe the target endpoint is
- // currently down.
- if (log.isDebugEnabled()) {
- log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
- targetName, targetAddress, attempt, timeoutMillis);
- }
-
- long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout);
- register(gateway, attempt + 1, newTimeoutMillis);
- }
- else {
- // a serious failure occurred. we still should not give up, but keep trying
- log.error("Registration at " + targetName + " failed due to an error", failure);
- log.info("Pausing and re-attempting registration in {} ms", delayOnError);
-
- registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
- }
- }
- }
- }, rpcService.getExecutionContext());
- }
- catch (Throwable t) {
- cancel();
- completionPromise.tryFailure(t);
- }
- }
-
- private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) {
- rpcService.scheduleRunnable(new Runnable() {
- @Override
- public void run() {
- register(gateway, attempt, timeoutMillis);
- }
- }, delay, TimeUnit.MILLISECONDS);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
deleted file mode 100644
index 7a2deae..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import java.io.Serializable;
-
-public class JobMasterRegistration implements Serializable {
- private static final long serialVersionUID = 8411214999193765202L;
-
- private final String address;
-
- public JobMasterRegistration(String address) {
- this.address = address;
- }
-
- public String getAddress() {
- return address;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
deleted file mode 100644
index 8ac9e49..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-
-import java.io.Serializable;
-
-public class RegistrationResponse implements Serializable {
- private static final long serialVersionUID = -2379003255993119993L;
-
- private final boolean isSuccess;
- private final InstanceID instanceID;
-
- public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
- this.isSuccess = isSuccess;
- this.instanceID = instanceID;
- }
-
- public boolean isSuccess() {
- return isSuccess;
- }
-
- public InstanceID getInstanceID() {
- return instanceID;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
deleted file mode 100644
index f7147c9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import akka.dispatch.Mapper;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-
-import scala.concurrent.Future;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
- * and bookkeeping.
- *
- * It offers the following methods as part of its rpc interface to interact with the him remotely:
- * <ul>
- * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
- * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
- * </ul>
- */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
- private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
- private final HighAvailabilityServices highAvailabilityServices;
- private LeaderElectionService leaderElectionService = null;
- private UUID leaderSessionID = null;
-
- public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
- super(rpcService);
- this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
- this.jobMasterGateways = new HashMap<>();
- }
-
- @Override
- public void start() {
- // start a leader
- try {
- super.start();
- leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
- leaderElectionService.start(new ResourceManagerLeaderContender());
- } catch (Throwable e) {
- log.error("A fatal error happened when starting the ResourceManager", e);
- throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
- }
- }
-
- @Override
- public void shutDown() {
- try {
- leaderElectionService.stop();
- super.shutDown();
- } catch(Throwable e) {
- log.error("A fatal error happened when shutdown the ResourceManager", e);
- throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
- }
- }
-
- /**
- * Gets the leader session id of current resourceManager.
- *
- * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
- */
- @VisibleForTesting
- UUID getLeaderSessionID() {
- return leaderSessionID;
- }
-
- /**
- * Register a {@link JobMaster} at the resource manager.
- *
- * @param jobMasterRegistration Job master registration information
- * @return Future registration response
- */
- @RpcMethod
- public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
- Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
-
- return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
- @Override
- public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
- InstanceID instanceID;
-
- if (jobMasterGateways.containsKey(jobMasterGateway)) {
- instanceID = jobMasterGateways.get(jobMasterGateway);
- } else {
- instanceID = new InstanceID();
- jobMasterGateways.put(jobMasterGateway, instanceID);
- }
-
- return new RegistrationResponse(true, instanceID);
- }
- }, getMainThreadExecutionContext());
- }
-
- /**
- * Requests a slot from the resource manager.
- *
- * @param slotRequest Slot request
- * @return Slot assignment
- */
- @RpcMethod
- public SlotAssignment requestSlot(SlotRequest slotRequest) {
- System.out.println("SlotRequest: " + slotRequest);
- return new SlotAssignment();
- }
-
-
- /**
- *
- * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that registers
- * @param resourceID The resource ID of the TaskExecutor that registers
- *
- * @return The response by the ResourceManager.
- */
- @RpcMethod
- public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
- UUID resourceManagerLeaderId,
- String taskExecutorAddress,
- ResourceID resourceID) {
-
- return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
- }
-
- private class ResourceManagerLeaderContender implements LeaderContender {
-
- /**
- * Callback method when current resourceManager is granted leadership
- *
- * @param leaderSessionID unique leadershipID
- */
- @Override
- public void grantLeadership(final UUID leaderSessionID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
- ResourceManager.this.leaderSessionID = leaderSessionID;
- // confirming the leader session ID might be blocking,
- leaderElectionService.confirmLeaderSessionID(leaderSessionID);
- }
- });
- }
-
- /**
- * Callback method when current resourceManager lose leadership.
- */
- @Override
- public void revokeLeadership() {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("ResourceManager {} was revoked leadership.", getAddress());
- jobMasterGateways.clear();
- leaderSessionID = null;
- }
- });
- }
-
- @Override
- public String getAddress() {
- return ResourceManager.this.getAddress();
- }
-
- /**
- * Handles error occurring in the leader election service
- *
- * @param exception Exception being thrown in the leader election service
- */
- @Override
- public void handleError(final Exception exception) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.error("ResourceManager received an error from the LeaderElectionService.", exception);
- // terminate ResourceManager in case of an error
- shutDown();
- }
- });
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
deleted file mode 100644
index afddb01..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-/**
- * The {@link ResourceManager}'s RPC gateway interface.
- */
-public interface ResourceManagerGateway extends RpcGateway {
-
- /**
- * Register a {@link JobMaster} at the resource manager.
- *
- * @param jobMasterRegistration Job master registration information
- * @param timeout Timeout for the future to complete
- * @return Future registration response
- */
- Future<RegistrationResponse> registerJobMaster(
- JobMasterRegistration jobMasterRegistration,
- @RpcTimeout FiniteDuration timeout);
-
- /**
- * Register a {@link JobMaster} at the resource manager.
- *
- * @param jobMasterRegistration Job master registration information
- * @return Future registration response
- */
- Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
-
- /**
- * Requests a slot from the resource manager.
- *
- * @param slotRequest Slot request
- * @return Future slot assignment
- */
- Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
-
- /**
- *
- * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
- * @param taskExecutorAddress The address of the TaskExecutor that registers
- * @param resourceID The resource ID of the TaskExecutor that registers
- * @param timeout The timeout for the response.
- *
- * @return The future to the response by the ResourceManager.
- */
- Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
- UUID resourceManagerLeaderId,
- String taskExecutorAddress,
- ResourceID resourceID,
- @RpcTimeout FiniteDuration timeout);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 86cd8b7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
- private static final long serialVersionUID = -6990813455942742322L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
deleted file mode 100644
index 74c7c39..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
- */
-public class SlotRequest implements Serializable {
-
- private static final long serialVersionUID = -6586877187990445986L;
-
- /** The JobID of the slot requested for */
- private final JobID jobId;
-
- /** The unique identification of this request */
- private final AllocationID allocationId;
-
- /** The resource profile of the required slot */
- private final ResourceProfile resourceProfile;
-
- public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
- this.jobId = checkNotNull(jobId);
- this.allocationId = checkNotNull(allocationId);
- this.resourceProfile = checkNotNull(resourceProfile);
- }
-
- /**
- * Get the JobID of the slot requested for.
- * @return The job id
- */
- public JobID getJobId() {
- return jobId;
- }
-
- /**
- * Get the unique identification of this request
- * @return the allocation id
- */
- public AllocationID getAllocationId() {
- return allocationId;
- }
-
- /**
- * Get the resource profile of the desired slot
- * @return The resource profile
- */
- public ResourceProfile getResourceProfile() {
- return resourceProfile;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
deleted file mode 100644
index c372ecb..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A report about the current status of all slots of the TaskExecutor, describing
- * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
- * have been allocated to.
- */
-public class SlotReport implements Serializable {
-
- private static final long serialVersionUID = -3150175198722481689L;
-
- /** The slots status of the TaskManager */
- private final List<SlotStatus> slotsStatus;
-
- /** The resource id which identifies the TaskManager */
- private final ResourceID resourceID;
-
- public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) {
- this.slotsStatus = checkNotNull(slotsStatus);
- this.resourceID = checkNotNull(resourceID);
- }
-
- public List<SlotStatus> getSlotsStatus() {
- return slotsStatus;
- }
-
- public ResourceID getResourceID() {
- return resourceID;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
deleted file mode 100644
index e8e2084..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This describes the slot current status which located in TaskManager.
- */
-public class SlotStatus implements Serializable {
-
- private static final long serialVersionUID = 5099191707339664493L;
-
- /** slotID to identify a slot */
- private final SlotID slotID;
-
- /** the resource profile of the slot */
- private final ResourceProfile profiler;
-
- /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
- private final AllocationID allocationID;
-
- /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
- private final JobID jobID;
-
- public SlotStatus(SlotID slotID, ResourceProfile profiler) {
- this(slotID, profiler, null, null);
- }
-
- public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
- this.slotID = checkNotNull(slotID, "slotID cannot be null");
- this.profiler = checkNotNull(profiler, "profile cannot be null");
- this.allocationID = allocationID;
- this.jobID = jobID;
- }
-
- /**
- * Get the unique identification of this slot
- *
- * @return The slot id
- */
- public SlotID getSlotID() {
- return slotID;
- }
-
- /**
- * Get the resource profile of this slot
- *
- * @return The resource profile
- */
- public ResourceProfile getProfiler() {
- return profiler;
- }
-
- /**
- * Get the allocation id of this slot
- *
- * @return The allocation id if this slot is allocated, otherwise null
- */
- public AllocationID getAllocationID() {
- return allocationID;
- }
-
- /**
- * Get the job id of the slot allocated for
- *
- * @return The job id if this slot is allocated, otherwise null
- */
- public JobID getJobID() {
- return jobID;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SlotStatus that = (SlotStatus) o;
-
- if (!slotID.equals(that.slotID)) {
- return false;
- }
- if (!profiler.equals(that.profiler)) {
- return false;
- }
- if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
- return false;
- }
- return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;
-
- }
-
- @Override
- public int hashCode() {
- int result = slotID.hashCode();
- result = 31 * result + profiler.hashCode();
- result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
- result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
- return result;
- }
-
-}