You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:31 UTC

[12/52] [abbrv] flink git commit: [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager

[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cbec02f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cbec02f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cbec02f

Branch: refs/heads/master
Commit: 5cbec02f1055218db1531bd67ebfe69a24746852
Parents: 106cb9e
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  1 -
 .../jobmanager/slots/PooledSlotProvider.java    | 73 ++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 24 ++++---
 3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5cbec02f/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index beca74d..7e7b21e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
 
 		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
 
-		final SlotOwner owner = this;
 		return future.thenApplyAsync(
 			new ApplyFunction<SlotDescriptor, SimpleSlot>() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5cbec02f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 0000000..5655fc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+	/** The pool which holds all the slots. */
+	private final SlotPool slotPool;
+
+	/** The timeout for allocation. */
+	private final Time timeout;
+
+	public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+		this.slotPool = slotPool;
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
+			boolean allowQueued) throws NoResourceAvailableException
+	{
+		checkNotNull(task);
+
+		final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
+		final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
+		try {
+			final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
+			return FlinkCompletableFuture.completed(slot);
+		} catch (InterruptedException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
+		} catch (ExecutionException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
+					"during allocation, " + e.getMessage());
+		} catch (TimeoutException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cbec02f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 81a3e23..e6720fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -57,7 +58,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -93,6 +93,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** The execution graph of this job */
 	private final ExecutionGraph executionGraph;
 
+	private final SlotPool slotPool;
+
+	private final Time allocationTimeout;
 
 	private volatile UUID leaderSessionID;
 
@@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
 	private ResourceManagerConnection resourceManagerConnection;
 
-	// TODO - we need to replace this with the slot pool
-	private final Scheduler scheduler;
 
 	// ------------------------------------------------------------------------
 
@@ -240,8 +242,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				-1,
 				log);
 
-		// TODO - temp fix
-		this.scheduler = new Scheduler(executorService);
+		this.slotPool = new SlotPool(executorService);
+		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -263,6 +265,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
 			super.start();
 
+			slotPool.setJobManagerLeaderId(leaderSessionID);
 			log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 			getSelf().startJobExecution();
 		} else {
@@ -338,7 +341,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				try {
-					executionGraph.scheduleForExecution(scheduler);
+					executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
 				} catch (Throwable t) {
 					executionGraph.fail(t);
 				}
@@ -366,6 +369,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		((StartStoppable) getSelf()).stop();
 
 		leaderSessionID = null;
+		slotPool.setJobManagerLeaderId(null);
 		executionGraph.suspend(cause);
 
 		// disconnect from resource manager:
@@ -777,9 +781,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
 				// verify the response with current connection
 				if (resourceManagerConnection != null
-						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+				{
 					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
 							success.getResourceManagerLeaderId());
+					slotPool.setResourceManager(success.getResourceManagerLeaderId(),
+							resourceManagerConnection.getTargetGateway());
 				}
 			}
 		});
@@ -790,6 +797,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+		slotPool.disconnectResourceManager();
 	}
 
 	//----------------------------------------------------------------------------------------------