You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/14 13:45:48 UTC

[06/50] [abbrv] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..896421b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
+ */
+public class SlotRequest implements Serializable {
+
+	private static final long serialVersionUID = -6586877187990445986L;
+
+	/** The JobID of the slot requested for */
+	private final JobID jobId;
+
+	/** The unique identification of this request */
+	private final AllocationID allocationId;
+
+	/** The resource profile of the required slot */
+	private final ResourceProfile resourceProfile;
+
+	public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
+		this.jobId = checkNotNull(jobId);
+		this.allocationId = checkNotNull(allocationId);
+		this.resourceProfile = checkNotNull(resourceProfile);
+	}
+
+	/**
+	 * Get the JobID of the slot requested for.
+	 * @return The job id
+	 */
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	/**
+	 * Get the unique identification of this request
+	 * @return the allocation id
+	 */
+	public AllocationID getAllocationId() {
+		return allocationId;
+	}
+
+	/**
+	 * Get the resource profile of the desired slot
+	 * @return The resource profile
+	 */
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
deleted file mode 100644
index a046cb8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.jobmaster;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.UUID;
-
-/**
- * JobMaster implementation. The job master is responsible for the execution of a single
- * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- * <p>
- * It offers the following methods as part of its rpc interface to interact with the JobMaster
- * remotely:
- * <ul>
- *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
- * given task</li>
- * </ul>
- */
-public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-
-	/** Gateway to connected resource manager, null iff not connected */
-	private ResourceManagerGateway resourceManager = null;
-
-	/** Logical representation of the job */
-	private final JobGraph jobGraph;
-	private final JobID jobID;
-
-	/** Configuration of the job */
-	private final Configuration configuration;
-
-	/** Service to contend for and retrieve the leadership of JM and RM */
-	private final HighAvailabilityServices highAvailabilityServices;
-
-	/** Leader Management */
-	private LeaderElectionService leaderElectionService = null;
-	private UUID leaderSessionID;
-
-	/**
-	 * The JM's Constructor
-	 *
-	 * @param jobGraph The representation of the job's execution plan
-	 * @param configuration The job's configuration
-	 * @param rpcService The RPC service at which the JM serves
-	 * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders.
-	 */
-	public JobMaster(
-		JobGraph jobGraph,
-		Configuration configuration,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityService) {
-
-		super(rpcService);
-
-		this.jobGraph = Preconditions.checkNotNull(jobGraph);
-		this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
-
-		this.configuration = Preconditions.checkNotNull(configuration);
-
-		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService);
-	}
-
-	public ResourceManagerGateway getResourceManager() {
-		return resourceManager;
-	}
-
-	//----------------------------------------------------------------------------------------------
-	// Initialization methods
-	//----------------------------------------------------------------------------------------------
-	public void start() {
-		super.start();
-
-		// register at the election once the JM starts
-		registerAtElectionService();
-	}
-
-
-	//----------------------------------------------------------------------------------------------
-	// JobMaster Leadership methods
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Retrieves the election service and contend for the leadership.
-	 */
-	private void registerAtElectionService() {
-		try {
-			leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
-			leaderElectionService.start(new JobMasterLeaderContender());
-		} catch (Exception e) {
-			throw new RuntimeException("Fail to register at the election of JobMaster", e);
-		}
-	}
-
-	/**
-	 * Start the execution when the leadership is granted.
-	 *
-	 * @param newLeaderSessionID The identifier of the new leadership session
-	 */
-	public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID);
-
-				// The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that
-				// JM waits here for the operation's completeness.
-				leaderSessionID = newLeaderSessionID;
-				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-
-				// TODO:: execute the job when the leadership is granted.
-			}
-		});
-	}
-
-	/**
-	 * Stop the execution when the leadership is revoked.
-	 */
-	public void revokeJobMasterLeadership() {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.info("JobManager {} was revoked leadership.", getAddress());
-
-				// TODO:: cancel the job's execution and notify all listeners
-				cancelAndClearEverything(new Exception("JobManager is no longer the leader."));
-
-				leaderSessionID = null;
-			}
-		});
-	}
-
-	/**
-	 * Handles error occurring in the leader election service
-	 *
-	 * @param exception Exception thrown in the leader election service
-	 */
-	public void onJobMasterElectionError(final Exception exception) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.error("Received an error from the LeaderElectionService.", exception);
-
-				// TODO:: cancel the job's execution and shutdown the JM
-				cancelAndClearEverything(exception);
-
-				leaderSessionID = null;
-			}
-		});
-
-	}
-
-	//----------------------------------------------------------------------------------------------
-	// RPC methods
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Updates the task execution state for a given task.
-	 *
-	 * @param taskExecutionState New task execution state for a given task
-	 * @return Acknowledge the task execution state update
-	 */
-	@RpcMethod
-	public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-		System.out.println("TaskExecutionState: " + taskExecutionState);
-		return Acknowledge.get();
-	}
-
-	/**
-	 * Triggers the registration of the job master at the resource manager.
-	 *
-	 * @param address Address of the resource manager
-	 */
-	@RpcMethod
-	public void registerAtResourceManager(final String address) {
-		//TODO:: register at the RM
-	}
-
-	//----------------------------------------------------------------------------------------------
-	// Helper methods
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Cancel the current job and notify all listeners the job's cancellation.
-	 *
-	 * @param cause Cause for the cancelling.
-	 */
-	private void cancelAndClearEverything(Throwable cause) {
-		// currently, nothing to do here
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utility classes
-	// ------------------------------------------------------------------------
-	private class JobMasterLeaderContender implements LeaderContender {
-
-		@Override
-		public void grantLeadership(UUID leaderSessionID) {
-			JobMaster.this.grantJobMasterLeadership(leaderSessionID);
-		}
-
-		@Override
-		public void revokeLeadership() {
-			JobMaster.this.revokeJobMasterLeadership();
-		}
-
-		@Override
-		public String getAddress() {
-			return JobMaster.this.getAddress();
-		}
-
-		@Override
-		public void handleError(Exception exception) {
-			onJobMasterElectionError(exception);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
deleted file mode 100644
index 17a4c3a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.jobmaster;
-
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
-
-/**
- * {@link JobMaster} rpc gateway interface
- */
-public interface JobMasterGateway extends RpcGateway {
-
-	/**
-	 * Updates the task execution state for a given task.
-	 *
-	 * @param taskExecutionState New task execution state for a given task
-	 * @return Future acknowledge of the task execution state update
-	 */
-	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
-
-	/**
-	 * Triggers the registration of the job master at the resource manager.
-	 *
-	 * @param address Address of the resource manager
-	 */
-	void registerAtResourceManager(final String address);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
deleted file mode 100644
index 2de560a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import java.io.Serializable;
-
-/**
- * Base class for responses given to registration attempts from {@link RetryingRegistration}.
- */
-public abstract class RegistrationResponse implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	// ----------------------------------------------------------------------------
-	
-	/**
-	 * Base class for a successful registration. Concrete registration implementations
-	 * will typically extend this class to attach more information.
-	 */
-	public static class Success extends RegistrationResponse {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public String toString() {
-			return "Registration Successful";
-		}
-	}
-
-	// ----------------------------------------------------------------------------
-
-	/**
-	 * A rejected (declined) registration.
-	 */
-	public static final class Decline extends RegistrationResponse {
-		private static final long serialVersionUID = 1L;
-
-		/** the rejection reason */
-		private final String reason;
-
-		/**
-		 * Creates a new rejection message.
-		 * 
-		 * @param reason The reason for the rejection.
-		 */
-		public Decline(String reason) {
-			this.reason = reason != null ? reason : "(unknown)";
-		}
-
-		/**
-		 * Gets the reason for the rejection.
-		 */
-		public String getReason() {
-			return reason;
-		}
-
-		@Override
-		public String toString() {
-			return "Registration Declined (" + reason + ')';
-		}
-	}
-}
-
-
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
deleted file mode 100644
index dcb5011..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.registration;
-
-import akka.dispatch.OnFailure;
-import akka.dispatch.OnSuccess;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.impl.Promise.DefaultPromise;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-
-/**
- * This utility class implements the basis of registering one component at another component,
- * for example registering the TaskExecutor at the ResourceManager.
- * This {@code RetryingRegistration} implements both the initial address resolution
- * and the retries-with-backoff strategy.
- * 
- * <p>The registration gives access to a future that is completed upon successful registration.
- * The registration can be canceled, for example when the target where it tries to register
- * at looses leader status.
- * 
- * @param <Gateway> The type of the gateway to connect to.
- * @param <Success> The type of the successful registration responses.
- */
-public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
-
-	// ------------------------------------------------------------------------
-	//  default configuration values
-	// ------------------------------------------------------------------------
-
-	/** default value for the initial registration timeout (milliseconds) */
-	private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
-
-	/** default value for the maximum registration timeout, after exponential back-off (milliseconds) */
-	private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
-
-	/** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout) */
-	private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
-
-	/** The pause (milliseconds) made after the registration attempt was refused */
-	private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
-
-	// ------------------------------------------------------------------------
-	// Fields
-	// ------------------------------------------------------------------------
-
-	private final Logger log;
-
-	private final RpcService rpcService;
-
-	private final String targetName;
-
-	private final Class<Gateway> targetType;
-
-	private final String targetAddress;
-
-	private final UUID leaderId;
-
-	private final Promise<Tuple2<Gateway, Success>> completionPromise;
-
-	private final long initialRegistrationTimeout;
-
-	private final long maxRegistrationTimeout;
-
-	private final long delayOnError;
-
-	private final long delayOnRefusedRegistration;
-
-	private volatile boolean canceled;
-
-	// ------------------------------------------------------------------------
-
-	public RetryingRegistration(
-			Logger log,
-			RpcService rpcService,
-			String targetName,
-			Class<Gateway> targetType,
-			String targetAddress,
-			UUID leaderId) {
-		this(log, rpcService, targetName, targetType, targetAddress, leaderId,
-				INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS,
-				ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS);
-	}
-
-	public RetryingRegistration(
-			Logger log,
-			RpcService rpcService,
-			String targetName, 
-			Class<Gateway> targetType,
-			String targetAddress,
-			UUID leaderId,
-			long initialRegistrationTimeout,
-			long maxRegistrationTimeout,
-			long delayOnError,
-			long delayOnRefusedRegistration) {
-
-		checkArgument(initialRegistrationTimeout > 0, "initial registration timeout must be greater than zero");
-		checkArgument(maxRegistrationTimeout > 0, "maximum registration timeout must be greater than zero");
-		checkArgument(delayOnError >= 0, "delay on error must be non-negative");
-		checkArgument(delayOnRefusedRegistration >= 0, "delay on refused registration must be non-negative");
-
-		this.log = checkNotNull(log);
-		this.rpcService = checkNotNull(rpcService);
-		this.targetName = checkNotNull(targetName);
-		this.targetType = checkNotNull(targetType);
-		this.targetAddress = checkNotNull(targetAddress);
-		this.leaderId = checkNotNull(leaderId);
-		this.initialRegistrationTimeout = initialRegistrationTimeout;
-		this.maxRegistrationTimeout = maxRegistrationTimeout;
-		this.delayOnError = delayOnError;
-		this.delayOnRefusedRegistration = delayOnRefusedRegistration;
-
-		this.completionPromise = new DefaultPromise<>();
-	}
-
-	// ------------------------------------------------------------------------
-	//  completion and cancellation
-	// ------------------------------------------------------------------------
-
-	public Future<Tuple2<Gateway, Success>> getFuture() {
-		return completionPromise.future();
-	}
-
-	/**
-	 * Cancels the registration procedure.
-	 */
-	public void cancel() {
-		canceled = true;
-	}
-
-	/**
-	 * Checks if the registration was canceled.
-	 * @return True if the registration was canceled, false otherwise.
-	 */
-	public boolean isCanceled() {
-		return canceled;
-	}
-
-	// ------------------------------------------------------------------------
-	//  registration
-	// ------------------------------------------------------------------------
-
-	protected abstract Future<RegistrationResponse> invokeRegistration(
-			Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception;
-
-	/**
-	 * This method resolves the target address to a callable gateway and starts the
-	 * registration after that.
-	 */
-	@SuppressWarnings("unchecked")
-	public void startRegistration() {
-		try {
-			// trigger resolution of the resource manager address to a callable gateway
-			Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
-	
-			// upon success, start the registration attempts
-			resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() {
-				@Override
-				public void onSuccess(Gateway result) {
-					log.info("Resolved {} address, beginning registration", targetName);
-					register(result, 1, initialRegistrationTimeout);
-				}
-			}, rpcService.getExecutionContext());
-	
-			// upon failure, retry, unless this is cancelled
-			resourceManagerFuture.onFailure(new OnFailure() {
-				@Override
-				public void onFailure(Throwable failure) {
-					if (!isCanceled()) {
-						log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress);
-						startRegistration();
-					}
-				}
-			}, rpcService.getExecutionContext());
-		}
-		catch (Throwable t) {
-			cancel();
-			completionPromise.tryFailure(t);
-		}
-	}
-
-	/**
-	 * This method performs a registration attempt and triggers either a success notification or a retry,
-	 * depending on the result.
-	 */
-	@SuppressWarnings("unchecked")
-	private void register(final Gateway gateway, final int attempt, final long timeoutMillis) {
-		// eager check for canceling to avoid some unnecessary work
-		if (canceled) {
-			return;
-		}
-
-		try {
-			log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
-			Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
-	
-			// if the registration was successful, let the TaskExecutor know
-			registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() {
-				
-				@Override
-				public void onSuccess(RegistrationResponse result) throws Throwable {
-					if (!isCanceled()) {
-						if (result instanceof RegistrationResponse.Success) {
-							// registration successful!
-							Success success = (Success) result;
-							completionPromise.success(new Tuple2<>(gateway, success));
-						}
-						else {
-							// registration refused or unknown
-							if (result instanceof RegistrationResponse.Decline) {
-								RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result;
-								log.info("Registration at {} was declined: {}", targetName, decline.getReason());
-							} else {
-								log.error("Received unknown response to registration attempt: " + result);
-							}
-
-							log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration);
-							registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration);
-						}
-					}
-				}
-			}, rpcService.getExecutionContext());
-	
-			// upon failure, retry
-			registrationFuture.onFailure(new OnFailure() {
-				@Override
-				public void onFailure(Throwable failure) {
-					if (!isCanceled()) {
-						if (failure instanceof TimeoutException) {
-							// we simply have not received a response in time. maybe the timeout was
-							// very low (initial fast registration attempts), maybe the target endpoint is
-							// currently down.
-							if (log.isDebugEnabled()) {
-								log.debug("Registration at {} ({}) attempt {} timed out after {} ms",
-										targetName, targetAddress, attempt, timeoutMillis);
-							}
-	
-							long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout);
-							register(gateway, attempt + 1, newTimeoutMillis);
-						}
-						else {
-							// a serious failure occurred. we still should not give up, but keep trying
-							log.error("Registration at " + targetName + " failed due to an error", failure);
-							log.info("Pausing and re-attempting registration in {} ms", delayOnError);
-	
-							registerLater(gateway, 1, initialRegistrationTimeout, delayOnError);
-						}
-					}
-				}
-			}, rpcService.getExecutionContext());
-		}
-		catch (Throwable t) {
-			cancel();
-			completionPromise.tryFailure(t);
-		}
-	}
-
-	private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) {
-		rpcService.scheduleRunnable(new Runnable() {
-			@Override
-			public void run() {
-				register(gateway, attempt, timeoutMillis);
-			}
-		}, delay, TimeUnit.MILLISECONDS);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
deleted file mode 100644
index 7a2deae..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import java.io.Serializable;
-
-public class JobMasterRegistration implements Serializable {
-	private static final long serialVersionUID = 8411214999193765202L;
-
-	private final String address;
-
-	public JobMasterRegistration(String address) {
-		this.address = address;
-	}
-
-	public String getAddress() {
-		return address;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
deleted file mode 100644
index 8ac9e49..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-
-import java.io.Serializable;
-
-public class RegistrationResponse implements Serializable {
-	private static final long serialVersionUID = -2379003255993119993L;
-
-	private final boolean isSuccess;
-	private final InstanceID instanceID;
-
-	public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
-		this.isSuccess = isSuccess;
-		this.instanceID = instanceID;
-	}
-
-	public boolean isSuccess() {
-		return isSuccess;
-	}
-
-	public InstanceID getInstanceID() {
-		return instanceID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
deleted file mode 100644
index f7147c9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import akka.dispatch.Mapper;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-
-import scala.concurrent.Future;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * ResourceManager implementation. The resource manager is responsible for resource de-/allocation
- * and bookkeeping.
- *
- * It offers the following methods as part of its rpc interface to interact with the him remotely:
- * <ul>
- *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
- * </ul>
- */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
-	private final HighAvailabilityServices highAvailabilityServices;
-	private LeaderElectionService leaderElectionService = null;
-	private UUID leaderSessionID = null;
-
-	public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) {
-		super(rpcService);
-		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
-		this.jobMasterGateways = new HashMap<>();
-	}
-
-	@Override
-	public void start() {
-		// start a leader
-		try {
-			super.start();
-			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
-			leaderElectionService.start(new ResourceManagerLeaderContender());
-		} catch (Throwable e) {
-			log.error("A fatal error happened when starting the ResourceManager", e);
-			throw new RuntimeException("A fatal error happened when starting the ResourceManager", e);
-		}
-	}
-
-	@Override
-	public void shutDown() {
-		try {
-			leaderElectionService.stop();
-			super.shutDown();
-		} catch(Throwable e) {
-			log.error("A fatal error happened when shutdown the ResourceManager", e);
-			throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e);
-		}
-	}
-
-	/**
-	 * Gets the leader session id of current resourceManager.
-	 *
-	 * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
-	 */
-	@VisibleForTesting
-	UUID getLeaderSessionID() {
-		return leaderSessionID;
-	}
-
-	/**
-	 * Register a {@link JobMaster} at the resource manager.
-	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @return Future registration response
-	 */
-	@RpcMethod
-	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
-
-		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
-			@Override
-			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
-				InstanceID instanceID;
-
-				if (jobMasterGateways.containsKey(jobMasterGateway)) {
-					instanceID = jobMasterGateways.get(jobMasterGateway);
-				} else {
-					instanceID = new InstanceID();
-					jobMasterGateways.put(jobMasterGateway, instanceID);
-				}
-
-				return new RegistrationResponse(true, instanceID);
-			}
-		}, getMainThreadExecutionContext());
-	}
-
-	/**
-	 * Requests a slot from the resource manager.
-	 *
-	 * @param slotRequest Slot request
-	 * @return Slot assignment
-	 */
-	@RpcMethod
-	public SlotAssignment requestSlot(SlotRequest slotRequest) {
-		System.out.println("SlotRequest: " + slotRequest);
-		return new SlotAssignment();
-	}
-
-
-	/**
-	 *
-	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
-	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
-	 * @param resourceID               The resource ID of the TaskExecutor that registers
-	 *
-	 * @return The response by the ResourceManager.
-	 */
-	@RpcMethod
-	public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor(
-			UUID resourceManagerLeaderId,
-			String taskExecutorAddress,
-			ResourceID resourceID) {
-
-		return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
-	}
-
-	private class ResourceManagerLeaderContender implements LeaderContender {
-
-		/**
-		 * Callback method when current resourceManager is granted leadership
-		 *
-		 * @param leaderSessionID unique leadershipID
-		 */
-		@Override
-		public void grantLeadership(final UUID leaderSessionID) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-					ResourceManager.this.leaderSessionID = leaderSessionID;
-					// confirming the leader session ID might be blocking,
-					leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-				}
-			});
-		}
-
-		/**
-		 * Callback method when current resourceManager lose leadership.
-		 */
-		@Override
-		public void revokeLeadership() {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.info("ResourceManager {} was revoked leadership.", getAddress());
-					jobMasterGateways.clear();
-					leaderSessionID = null;
-				}
-			});
-		}
-
-		@Override
-		public String getAddress() {
-			return ResourceManager.this.getAddress();
-		}
-
-		/**
-		 * Handles error occurring in the leader election service
-		 *
-		 * @param exception Exception being thrown in the leader election service
-		 */
-		@Override
-		public void handleError(final Exception exception) {
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					log.error("ResourceManager received an error from the LeaderElectionService.", exception);
-					// terminate ResourceManager in case of an error
-					shutDown();
-				}
-			});
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
deleted file mode 100644
index afddb01..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-
-/**
- * The {@link ResourceManager}'s RPC gateway interface.
- */
-public interface ResourceManagerGateway extends RpcGateway {
-
-	/**
-	 * Register a {@link JobMaster} at the resource manager.
-	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @param timeout Timeout for the future to complete
-	 * @return Future registration response
-	 */
-	Future<RegistrationResponse> registerJobMaster(
-		JobMasterRegistration jobMasterRegistration,
-		@RpcTimeout FiniteDuration timeout);
-
-	/**
-	 * Register a {@link JobMaster} at the resource manager.
-	 *
-	 * @param jobMasterRegistration Job master registration information
-	 * @return Future registration response
-	 */
-	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
-
-	/**
-	 * Requests a slot from the resource manager.
-	 *
-	 * @param slotRequest Slot request
-	 * @return Future slot assignment
-	 */
-	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
-
-	/**
-	 * 
-	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader 
-	 * @param taskExecutorAddress      The address of the TaskExecutor that registers
-	 * @param resourceID               The resource ID of the TaskExecutor that registers
-	 * @param timeout                  The timeout for the response.
-	 * 
-	 * @return The future to the response by the ResourceManager.
-	 */
-	Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor(
-			UUID resourceManagerLeaderId,
-			String taskExecutorAddress,
-			ResourceID resourceID,
-			@RpcTimeout FiniteDuration timeout);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 86cd8b7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
-	private static final long serialVersionUID = -6990813455942742322L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
deleted file mode 100644
index 74c7c39..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This describes the requirement of the slot, mainly used by JobManager requesting slot from ResourceManager.
- */
-public class SlotRequest implements Serializable {
-
-	private static final long serialVersionUID = -6586877187990445986L;
-
-	/** The JobID of the slot requested for */
-	private final JobID jobId;
-
-	/** The unique identification of this request */
-	private final AllocationID allocationId;
-
-	/** The resource profile of the required slot */
-	private final ResourceProfile resourceProfile;
-
-	public SlotRequest(JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile) {
-		this.jobId = checkNotNull(jobId);
-		this.allocationId = checkNotNull(allocationId);
-		this.resourceProfile = checkNotNull(resourceProfile);
-	}
-
-	/**
-	 * Get the JobID of the slot requested for.
-	 * @return The job id
-	 */
-	public JobID getJobId() {
-		return jobId;
-	}
-
-	/**
-	 * Get the unique identification of this request
-	 * @return the allocation id
-	 */
-	public AllocationID getAllocationId() {
-		return allocationId;
-	}
-
-	/**
-	 * Get the resource profile of the desired slot
-	 * @return The resource profile
-	 */
-	public ResourceProfile getResourceProfile() {
-		return resourceProfile;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
deleted file mode 100644
index c372ecb..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A report about the current status of all slots of the TaskExecutor, describing
- * which slots are available and allocated, and what jobs (JobManagers) the allocated slots
- * have been allocated to.
- */
-public class SlotReport implements Serializable {
-
-	private static final long serialVersionUID = -3150175198722481689L;
-
-	/** The slots status of the TaskManager */
-	private final List<SlotStatus> slotsStatus;
-
-	/** The resource id which identifies the TaskManager */
-	private final ResourceID resourceID;
-
-	public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) {
-		this.slotsStatus = checkNotNull(slotsStatus);
-		this.resourceID = checkNotNull(resourceID);
-	}
-
-	public List<SlotStatus> getSlotsStatus() {
-		return slotsStatus;
-	}
-
-	public ResourceID getResourceID() {
-		return resourceID;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c247d1f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
deleted file mode 100644
index e8e2084..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.taskexecutor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-
-import java.io.Serializable;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * This describes the slot current status which located in TaskManager.
- */
-public class SlotStatus implements Serializable {
-
-	private static final long serialVersionUID = 5099191707339664493L;
-
-	/** slotID to identify a slot */
-	private final SlotID slotID;
-
-	/** the resource profile of the slot */
-	private final ResourceProfile profiler;
-
-	/** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */
-	private final AllocationID allocationID;
-
-	/** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */
-	private final JobID jobID;
-
-	public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-		this(slotID, profiler, null, null);
-	}
-
-	public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) {
-		this.slotID = checkNotNull(slotID, "slotID cannot be null");
-		this.profiler = checkNotNull(profiler, "profile cannot be null");
-		this.allocationID = allocationID;
-		this.jobID = jobID;
-	}
-
-	/**
-	 * Get the unique identification of this slot
-	 *
-	 * @return The slot id
-	 */
-	public SlotID getSlotID() {
-		return slotID;
-	}
-
-	/**
-	 * Get the resource profile of this slot
-	 *
-	 * @return The resource profile
-	 */
-	public ResourceProfile getProfiler() {
-		return profiler;
-	}
-
-	/**
-	 * Get the allocation id of this slot
-	 *
-	 * @return The allocation id if this slot is allocated, otherwise null
-	 */
-	public AllocationID getAllocationID() {
-		return allocationID;
-	}
-
-	/**
-	 * Get the job id of the slot allocated for
-	 *
-	 * @return The job id if this slot is allocated, otherwise null
-	 */
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		SlotStatus that = (SlotStatus) o;
-
-		if (!slotID.equals(that.slotID)) {
-			return false;
-		}
-		if (!profiler.equals(that.profiler)) {
-			return false;
-		}
-		if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) {
-			return false;
-		}
-		return jobID != null ? jobID.equals(that.jobID) : that.jobID == null;
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = slotID.hashCode();
-		result = 31 * result + profiler.hashCode();
-		result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0);
-		result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
-		return result;
-	}
-
-}