You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:40:59 UTC
[32/50] [abbrv] flink git commit: [FLINK-4689] [cluster management]
Implement a simple slot provider for the new job manager
[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a02232e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a02232e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a02232e
Branch: refs/heads/flip-6
Commit: 0a02232ed69c7df210508bf944e39a0d61d059c9
Parents: cd069f0
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 1 09:39:32 2016 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/instance/SlotPool.java | 1 -
.../jobmanager/slots/PooledSlotProvider.java | 73 ++++++++++++++++++++
.../flink/runtime/jobmaster/JobMaster.java | 24 ++++---
3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a02232e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index e7857c1..de952c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
internalAllocateSlot(jobID, allocationID, resourceProfile, future);
- final SlotOwner owner = this;
return future.thenApplyAsync(
new ApplyFunction<SlotDescriptor, SimpleSlot>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0a02232e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 0000000..5655fc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+ /** The pool which holds all the slots. */
+ private final SlotPool slotPool;
+
+ /** The timeout for allocation. */
+ private final Time timeout;
+
+ public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+ this.slotPool = slotPool;
+ this.timeout = timeout;
+ }
+
+ @Override
+ public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
+ boolean allowQueued) throws NoResourceAvailableException
+ {
+ checkNotNull(task);
+
+ final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
+ final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
+ try {
+ final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
+ return FlinkCompletableFuture.completed(slot);
+ } catch (InterruptedException e) {
+ throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
+ } catch (ExecutionException e) {
+ throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
+ "during allocation, " + e.getMessage());
+ } catch (TimeoutException e) {
+ throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a02232e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a7be476..05c20d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.io.network.PartitionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
-
import org.slf4j.Logger;
import javax.annotation.Nullable;
@@ -93,6 +93,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** The execution graph of this job */
private final ExecutionGraph executionGraph;
+ private final SlotPool slotPool;
+
+ private final Time allocationTimeout;
private volatile UUID leaderSessionID;
@@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Connection with ResourceManager, null if not located address yet or we close it initiative */
private ResourceManagerConnection resourceManagerConnection;
- // TODO - we need to replace this with the slot pool
- private final Scheduler scheduler;
// ------------------------------------------------------------------------
@@ -239,8 +241,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-1,
log);
- // TODO - temp fix
- this.scheduler = new Scheduler(executorService);
+ this.slotPool = new SlotPool(executorService);
+ this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
}
//----------------------------------------------------------------------------------------------
@@ -262,6 +264,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
super.start();
+ slotPool.setJobManagerLeaderId(leaderSessionID);
log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
getSelf().startJobExecution();
} else {
@@ -337,7 +340,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void run() {
try {
- executionGraph.scheduleForExecution(scheduler);
+ executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
} catch (Throwable t) {
executionGraph.fail(t);
}
@@ -365,6 +368,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
((StartStoppable) getSelf()).stop();
leaderSessionID = null;
+ slotPool.setJobManagerLeaderId(null);
executionGraph.suspend(cause);
// disconnect from resource manager:
@@ -783,9 +787,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
// verify the response with current connection
if (resourceManagerConnection != null
- && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+ && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+ {
log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
success.getResourceManagerLeaderId());
+ slotPool.setResourceManager(success.getResourceManagerLeaderId(),
+ resourceManagerConnection.getTargetGateway());
}
}
});
@@ -796,6 +803,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+ slotPool.disconnectResourceManager();
}
//----------------------------------------------------------------------------------------------