You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/14 08:07:54 UTC
[2/3] flink git commit: [FLINK-8078] Introduce LogicalSlot interface
[FLINK-8078] Introduce LogicalSlot interface
The LogicalSlot interface decouples the task deployment from the actual
slot implementation which at the moment is Slot, SimpleSlot and SharedSlot.
This is a helpful step to introduce a different slot implementation for
Flip-6.
This closes #5086.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb9c64b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb9c64b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb9c64b1
Branch: refs/heads/master
Commit: bb9c64b1222a5e9568cf93186a6420bebcb306f9
Parents: 0d55164
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 14 23:50:52 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Dec 13 22:37:32 2017 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 9 +-
.../flink/runtime/executiongraph/Execution.java | 62 ++---
.../runtime/executiongraph/ExecutionVertex.java | 15 +-
.../flink/runtime/instance/LogicalSlot.java | 93 ++++++++
.../flink/runtime/instance/SimpleSlot.java | 16 +-
.../apache/flink/runtime/instance/SlotPool.java | 20 +-
.../flink/runtime/instance/SlotPoolGateway.java | 2 +-
.../flink/runtime/instance/SlotProvider.java | 2 +-
.../runtime/jobmanager/scheduler/Scheduler.java | 13 +-
.../flink/runtime/jobmaster/JobMaster.java | 4 +-
.../InputChannelDeploymentDescriptorTest.java | 19 +-
.../ExecutionGraphDeploymentTest.java | 5 +-
.../ExecutionGraphMetricsTest.java | 53 +----
.../ExecutionGraphSchedulingTest.java | 17 +-
.../runtime/executiongraph/ExecutionTest.java | 3 +-
.../ExecutionVertexDeploymentTest.java | 10 +-
.../executiongraph/ProgrammedSlotProvider.java | 18 +-
.../utils/SimpleSlotProvider.java | 3 +-
.../flink/runtime/instance/SharedSlotsTest.java | 3 +-
.../flink/runtime/instance/SimpleSlotTest.java | 16 +-
.../flink/runtime/instance/SlotPoolRpcTest.java | 10 +-
.../flink/runtime/instance/SlotPoolTest.java | 48 ++--
.../runtime/instance/TestingLogicalSlot.java | 114 ++++++++++
.../PartialConsumePipelinedResultTest.java | 4 +-
.../ScheduleOrUpdateConsumersTest.java | 3 +-
.../ScheduleWithCoLocationHintTest.java | 178 +++++++--------
.../scheduler/SchedulerIsolatedTasksTest.java | 85 ++++---
.../scheduler/SchedulerSlotSharingTest.java | 226 +++++++++----------
28 files changed, 614 insertions(+), 437 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index fe1c599..8d76207 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -85,10 +85,9 @@ public class InputChannelDeploymentDescriptor implements Serializable {
*/
public static InputChannelDeploymentDescriptor[] fromEdges(
ExecutionEdge[] edges,
- SimpleSlot consumerSlot,
+ ResourceID consumerResourceId,
boolean allowLazyDeployment) throws ExecutionGraphException {
- final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
// Each edge is connected to a different result partition
@@ -97,7 +96,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
final Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt();
final ExecutionState producerState = producer.getState();
- final SimpleSlot producerSlot = producer.getAssignedResource();
+ final LogicalSlot producerSlot = producer.getAssignedResource();
final ResultPartitionLocation partitionLocation;
@@ -111,7 +110,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
- if (partitionTaskManager.equals(consumerTaskManager)) {
+ if (partitionTaskManager.equals(consumerResourceId)) {
// Consuming task is deployed to the same TaskManager as the partition => local
partitionLocation = ResultPartitionLocation.createLocal();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 38c3821..00a452d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -98,9 +98,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
- private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ private static final AtomicReferenceFieldUpdater<Execution, LogicalSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Execution.class,
- SimpleSlot.class,
+ LogicalSlot.class,
"assignedResource");
private static final Logger LOG = ExecutionGraph.LOG;
@@ -141,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private volatile ExecutionState state = CREATED;
- private volatile SimpleSlot assignedResource;
+ private volatile LogicalSlot assignedResource;
private volatile Throwable failureCause; // once assigned, never changes
@@ -240,7 +240,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
return taskManagerLocationFuture;
}
- public SimpleSlot getAssignedResource() {
+ public LogicalSlot getAssignedResource() {
return assignedResource;
}
@@ -248,21 +248,21 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* Tries to assign the given slot to the execution. The assignment works only if the
* Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
*
- * @param slot to assign to this execution
+ * @param logicalSlot to assign to this execution
* @return true if the slot could be assigned to the execution, otherwise false
*/
@VisibleForTesting
- boolean tryAssignResource(final SimpleSlot slot) {
- checkNotNull(slot);
+ boolean tryAssignResource(final LogicalSlot logicalSlot) {
+ checkNotNull(logicalSlot);
// only allow to set the assigned resource in state SCHEDULED or CREATED
// note: we also accept resource assignment when being in state CREATED for testing purposes
if (state == SCHEDULED || state == CREATED) {
- if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) {
+ if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, logicalSlot)) {
// check for concurrent modification (e.g. cancelling call)
if (state == SCHEDULED || state == CREATED) {
checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
- taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
+ taskManagerLocationFuture.complete(logicalSlot.getTaskManagerLocation());
return true;
} else {
@@ -283,7 +283,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public TaskManagerLocation getAssignedResourceLocation() {
// returns non-null only when a location is already assigned
- final SimpleSlot currentAssignedResource = assignedResource;
+ final LogicalSlot currentAssignedResource = assignedResource;
return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
}
@@ -442,14 +442,14 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
queued,
preferredLocations))
.thenApply(
- (SimpleSlot slot) -> {
- if (tryAssignResource(slot)) {
+ (LogicalSlot logicalSlot) -> {
+ if (tryAssignResource(logicalSlot)) {
return this;
} else {
// release the slot
- slot.releaseSlot();
+ logicalSlot.releaseSlot();
- throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
+ throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
}
@@ -465,7 +465,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
public void deploy() throws JobException {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
@@ -493,7 +493,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try {
// good, we are allowed to deploy
- if (!slot.setExecutedVertex(this)) {
+ if (!slot.setExecution(this)) {
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
@@ -545,7 +545,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* Sends stop RPC call.
*/
public void stop() {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -608,7 +608,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
try {
vertex.getExecutionGraph().deregisterExecution(this);
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
@@ -691,7 +691,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// ----------------------------------------------------------------
else {
if (consumerState == RUNNING) {
- final SimpleSlot consumerSlot = consumer.getAssignedResource();
+ final LogicalSlot consumerSlot = consumer.getAssignedResource();
if (consumerSlot == null) {
// The consumer has been reset concurrently
@@ -702,7 +702,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
.getCurrentAssignedResource().getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
- final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
+ final ResourceID consumerTaskManager = consumerSlot.getTaskManagerLocation().getResourceID();
final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);
@@ -778,7 +778,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
int maxStrackTraceDepth,
Time timeout) {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -802,7 +802,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param timestamp of the completed checkpoint
*/
public void notifyCheckpointComplete(long checkpointId, long timestamp) {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -822,7 +822,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param checkpointOptions of the checkpoint to trigger
*/
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -880,7 +880,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
@@ -938,7 +938,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (transitionState(current, CANCELED)) {
try {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
@@ -1035,7 +1035,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
updateAccumulatorsAndMetrics(userAccumulators, metrics);
try {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
slot.releaseSlot();
}
@@ -1119,7 +1119,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* The sending is tried up to NUM_CANCEL_CALL_TRIES times.
*/
private void sendCancelRpcCall() {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1140,7 +1140,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
private void sendFailIntermediateResultPartitionsRpcCall() {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1158,7 +1158,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private void sendUpdatePartitionInfoRpcCall(
final Iterable<PartitionInfo> partitionInfos) {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1318,7 +1318,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
@Override
public String toString() {
- final SimpleSlot slot = assignedResource;
+ final LogicalSlot slot = assignedResource;
return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
(slot == null ? "(unassigned)" : slot), state);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6d45d06..c2c986f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -272,7 +273,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
return currentExecution.getTaskManagerLocationFuture();
}
- public SimpleSlot getCurrentAssignedResource() {
+ public LogicalSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource();
}
@@ -744,7 +745,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
*/
TaskDeploymentDescriptor createDeploymentDescriptor(
ExecutionAttemptID executionId,
- SimpleSlot targetSlot,
+ LogicalSlot targetSlot,
TaskStateSnapshot taskStateHandles,
int attemptNumber) throws ExecutionGraphException {
@@ -779,8 +780,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
for (ExecutionEdge[] edges : inputEdges) {
- InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
- .fromEdges(edges, targetSlot, lazyScheduling);
+ InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor.fromEdges(
+ edges,
+ targetSlot.getTaskManagerLocation().getResourceID(),
+ lazyScheduling);
// If the produced partition has multiple consumers registered, we
// need to request the one matching our sub task index.
@@ -829,10 +832,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
serializedJobInformation,
serializedTaskInformation,
executionId,
- targetSlot.getAllocatedSlot().getSlotAllocationId(),
+ targetSlot.getAllocationId(),
subTaskIndex,
attemptNumber,
- targetSlot.getRoot().getSlotNumber(),
+ targetSlot.getPhysicalSlotNumber(),
taskStateHandles,
producedPartitions,
consumedPartitions);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
new file mode 100644
index 0000000..3ebe107
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/LogicalSlot.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+ /**
+ * Return the TaskManager location of this slot
+ *
+ * @return TaskManager location of this slot
+ */
+ TaskManagerLocation getTaskManagerLocation();
+
+ /**
+ * Return the TaskManager gateway to talk to the TaskManager.
+ *
+ * @return TaskManager gateway to talk to the TaskManager
+ */
+ TaskManagerGateway getTaskManagerGateway();
+
+ /**
+ * True if the slot is still alive.
+ *
+ * @return True if the slot is still alive, otherwise false
+ */
+ boolean isAlive();
+
+ /**
+ * True if the slot is canceled.
+ *
+ * @return True if the slot is canceled, otherwise false
+ */
+ boolean isCanceled();
+
+ /**
+ * True if the slot is released.
+ *
+ * @return True if the slot is released, otherwise false
+ */
+ boolean isReleased();
+
+ /**
+ * Sets the execution for this slot.
+ *
+ * @param execution to set for this slot
+ * @return true if the slot could be set, otherwise false
+ */
+ boolean setExecution(Execution execution);
+
+ /**
+ * Releases this slot.
+ */
+ void releaseSlot();
+
+ /**
+ * Gets the slot number on the TaskManager.
+ *
+ * @return slot number
+ */
+ int getPhysicalSlotNumber();
+
+ /**
+ * Gets the allocation id of this slot.
+ *
+ * @return allocation id of this slot
+ */
+ AllocationID getAllocationId();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index 8c7ec01..9591028 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.instance;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.api.common.JobID;
@@ -37,7 +38,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot.
* If not, then the parent attribute is null.
*/
-public class SimpleSlot extends Slot {
+public class SimpleSlot extends Slot implements LogicalSlot {
/** The updater used to atomically swap in the execution */
private static final AtomicReferenceFieldUpdater<SimpleSlot, Execution> VERTEX_UPDATER =
@@ -163,7 +164,8 @@ public class SimpleSlot extends Slot {
* @param executedVertex The vertex to assign to this slot.
* @return True, if the vertex was assigned, false, otherwise.
*/
- public boolean setExecutedVertex(Execution executedVertex) {
+ @Override
+ public boolean setExecution(Execution executedVertex) {
if (executedVertex == null) {
throw new NullPointerException();
}
@@ -231,6 +233,16 @@ public class SimpleSlot extends Slot {
}
}
+ @Override
+ public int getPhysicalSlotNumber() {
+ return getRootSlotNumber();
+ }
+
+ @Override
+ public AllocationID getAllocationId() {
+ return getAllocatedSlot().getSlotAllocationId();
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 7e98e11..66af865 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -266,7 +266,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
// ------------------------------------------------------------------------
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
+ public CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
@@ -303,7 +303,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return CompletableFuture.completedFuture(Acknowledge.get());
}
- CompletableFuture<SimpleSlot> internalAllocateSlot(
+ CompletableFuture<LogicalSlot> internalAllocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
@@ -318,7 +318,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
// the request will be completed by a future
- final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
+ final CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
// (2) need to request a slot
if (resourceManagerGateway == null) {
@@ -433,7 +433,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private void stashRequestWaitingForResourceManager(
final SlotRequestID requestId,
final ResourceProfile resources,
- final CompletableFuture<SimpleSlot> future) {
+ final CompletableFuture<LogicalSlot> future) {
LOG.info("Cannot serve slot request, no ResourceManager connected. " +
"Adding as pending request {}", requestId);
@@ -1087,15 +1087,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
}
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
+ public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
final SlotRequestID requestId = new SlotRequestID();
- CompletableFuture<SimpleSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
+ CompletableFuture<LogicalSlot> slotFuture = gateway.allocateSlot(requestId, task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
slotFuture.whenComplete(
- (SimpleSlot slot, Throwable failure) -> {
+ (LogicalSlot slot, Throwable failure) -> {
if (failure != null) {
gateway.cancelSlotAllocation(requestId);
}
@@ -1113,13 +1113,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
private final SlotRequestID slotRequestId;
- private final CompletableFuture<SimpleSlot> future;
+ private final CompletableFuture<LogicalSlot> future;
private final ResourceProfile resourceProfile;
PendingRequest(
SlotRequestID slotRequestId,
- CompletableFuture<SimpleSlot> future,
+ CompletableFuture<LogicalSlot> future,
ResourceProfile resourceProfile) {
this.slotRequestId = Preconditions.checkNotNull(slotRequestId);
this.future = Preconditions.checkNotNull(future);
@@ -1130,7 +1130,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
return slotRequestId;
}
- public CompletableFuture<SimpleSlot> getFuture() {
+ public CompletableFuture<LogicalSlot> getFuture() {
return future;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index bf520f5..ad2a6a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -86,7 +86,7 @@ public interface SlotPoolGateway extends RpcGateway {
// allocating and disposing slots
// ------------------------------------------------------------------------
- CompletableFuture<SimpleSlot> allocateSlot(
+ CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestID requestId,
ScheduledUnit task,
ResourceProfile resources,
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index ef988b4..98427c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -45,7 +45,7 @@ public interface SlotProvider {
* @param preferredLocations preferred locations for the slot allocation
* @return The future of the allocation
*/
- CompletableFuture<SimpleSlot> allocateSlot(
+ CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 1995c12..2715146 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -133,7 +134,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
+ public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
@@ -146,7 +147,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
}
else if (ret instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
- CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
+ CompletableFuture<LogicalSlot> typed = (CompletableFuture<LogicalSlot>) ret;
return typed;
}
else {
@@ -321,7 +322,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
else {
// no resource available now, so queue the request
if (queueIfNoResource) {
- CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
+ CompletableFuture<LogicalSlot> future = new CompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
@@ -837,10 +838,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
private final ScheduledUnit task;
- private final CompletableFuture<SimpleSlot> future;
+ private final CompletableFuture<LogicalSlot> future;
- public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) {
+ public QueuedTask(ScheduledUnit task, CompletableFuture<LogicalSlot> future) {
this.task = task;
this.future = future;
}
@@ -849,7 +850,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
return task;
}
- public CompletableFuture<SimpleSlot> getFuture() {
+ public CompletableFuture<LogicalSlot> getFuture() {
return future;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1d4eb6a..687b6d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotPool;
import org.apache.flink.runtime.instance.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -444,7 +444,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
}
- final Slot slot = execution.getAssignedResource();
+ final LogicalSlot slot = execution.getAssignedResource();
final int taskId = execution.getVertex().getParallelSubtaskIndex();
final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index 27e8af3..fc2c06f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -60,7 +60,7 @@ public class InputChannelDeploymentDescriptorTest {
ResourceID consumerResourceId = ResourceID.generate();
ExecutionVertex consumer = mock(ExecutionVertex.class);
- SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+ LogicalSlot consumerSlot = mockSlot(consumerResourceId);
// Local and remote channel are only allowed for certain execution
// states.
@@ -86,7 +86,7 @@ public class InputChannelDeploymentDescriptorTest {
InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge},
- consumerSlot,
+ consumerSlot.getTaskManagerLocation().getResourceID(),
allowLazyDeployment);
assertEquals(3, desc.length);
@@ -124,7 +124,7 @@ public class InputChannelDeploymentDescriptorTest {
public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception {
ResourceID consumerResourceId = ResourceID.generate();
ExecutionVertex consumer = mock(ExecutionVertex.class);
- SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+ LogicalSlot consumerSlot = mockSlot(consumerResourceId);
// Unknown partition
ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
@@ -137,7 +137,7 @@ public class InputChannelDeploymentDescriptorTest {
InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
new ExecutionEdge[]{unknownEdge},
- consumerSlot,
+ consumerSlot.getTaskManagerLocation().getResourceID(),
allowLazyDeployment);
assertEquals(1, desc.length);
@@ -152,7 +152,7 @@ public class InputChannelDeploymentDescriptorTest {
InputChannelDeploymentDescriptor.fromEdges(
new ExecutionEdge[]{unknownEdge},
- consumerSlot,
+ consumerSlot.getTaskManagerLocation().getResourceID(),
allowLazyDeployment);
fail("Did not throw expected ExecutionGraphException");
@@ -162,10 +162,9 @@ public class InputChannelDeploymentDescriptorTest {
// ------------------------------------------------------------------------
- private static SimpleSlot mockSlot(ResourceID resourceId) {
- SimpleSlot slot = mock(SimpleSlot.class);
+ private static LogicalSlot mockSlot(ResourceID resourceId) {
+ LogicalSlot slot = mock(LogicalSlot.class);
when(slot.getTaskManagerLocation()).thenReturn(new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000));
- when(slot.getTaskManagerID()).thenReturn(resourceId);
return slot;
}
@@ -178,7 +177,7 @@ public class InputChannelDeploymentDescriptorTest {
when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
if (resourceId != null) {
- SimpleSlot slot = mockSlot(resourceId);
+ LogicalSlot slot = mockSlot(resourceId);
when(exec.getAssignedResource()).thenReturn(slot);
when(vertex.getCurrentAssignedResource()).thenReturn(slot);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index de91d78..b489478 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -587,10 +588,10 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>(2);
+ final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> slotFutures = new HashMap<>(2);
for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) {
- CompletableFuture<SimpleSlot>[] slotFutureArray = new CompletableFuture[parallelism];
+ CompletableFuture<LogicalSlot>[] slotFutureArray = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
slotFutureArray[i] = new CompletableFuture<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d3cec30..92c7c61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -18,38 +18,28 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.LogicalSlot;
+import org.apache.flink.runtime.instance.TestingLogicalSlot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
-import org.mockito.Matchers;
import java.io.IOException;
import java.util.ArrayList;
@@ -87,40 +77,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
Time timeout = Time.seconds(10L);
Scheduler scheduler = mock(Scheduler.class);
- ResourceID taskManagerId = ResourceID.generate();
-
- TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
- when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId);
- when(taskManagerLocation.getHostname()).thenReturn("localhost");
-
- TaskManagerGateway taskManagerGateway = mock(TaskManagerGateway.class);
-
- Instance instance = mock(Instance.class);
- when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
- when(instance.getTaskManagerID()).thenReturn(taskManagerId);
- when(instance.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-
- Slot rootSlot = mock(Slot.class);
-
- AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
- when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
-
- SimpleSlot simpleSlot = mock(SimpleSlot.class);
- when(simpleSlot.isAlive()).thenReturn(true);
- when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
- when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId);
- when(simpleSlot.getTaskManagerGateway()).thenReturn(taskManagerGateway);
- when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
- when(simpleSlot.getRoot()).thenReturn(rootSlot);
- when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
-
- CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
- future.complete(simpleSlot);
- when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
-
- when(rootSlot.getSlotNumber()).thenReturn(0);
-
- when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+ CompletableFuture<LogicalSlot> slotFuture1 = CompletableFuture.completedFuture(new TestingLogicalSlot());
+ CompletableFuture<LogicalSlot> slotFuture2 = CompletableFuture.completedFuture(new TestingLogicalSlot());
+ when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(slotFuture1, slotFuture2);
TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
@@ -130,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
jobGraph.getJobID(),
jobGraph.getName(),
jobConfig,
- new SerializedValue<ExecutionConfig>(null),
+ new SerializedValue<>(null),
timeout,
testingRestartStrategy,
scheduler);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 90136a6..71ca3a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -109,8 +110,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
- final CompletableFuture<SimpleSlot> sourceFuture = new CompletableFuture<>();
- final CompletableFuture<SimpleSlot> targetFuture = new CompletableFuture<>();
+ final CompletableFuture<LogicalSlot> sourceFuture = new CompletableFuture<>();
+ final CompletableFuture<LogicalSlot> targetFuture = new CompletableFuture<>();
ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
@@ -177,9 +178,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
@SuppressWarnings({"unchecked", "rawtypes"})
- final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
+ final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
+ final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
//
// Create the slots, futures, and the slot provider
@@ -283,9 +284,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
+ final CompletableFuture<LogicalSlot>[] sourceFutures = new CompletableFuture[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
+ final CompletableFuture<LogicalSlot>[] targetFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
@@ -358,7 +359,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
final SimpleSlot[] slots = new SimpleSlot[parallelism];
@SuppressWarnings({"unchecked", "rawtypes"})
- final CompletableFuture<SimpleSlot>[] slotFutures = new CompletableFuture[parallelism];
+ final CompletableFuture<LogicalSlot>[] slotFutures = new CompletableFuture[parallelism];
for (int i = 0; i < parallelism; i++) {
slots[i] = createSlot(taskManager, jobId, slotOwner);
@@ -392,7 +393,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
// verify that no deployments have happened
verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
- for (CompletableFuture<SimpleSlot> future : slotFutures) {
+ for (CompletableFuture<LogicalSlot> future : slotFutures) {
if (future.isDone()) {
assertTrue(future.get().isCanceled());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index fa845cf..43a6432 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -62,7 +63,7 @@ public class ExecutionTest extends TestLogger {
final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
jobVertex.setInvokableClass(NoOpInvokable.class);
- final CompletableFuture<SimpleSlot> slotFuture = new CompletableFuture<>();
+ final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>();
final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
slotProvider.addSlot(jobVertexId, 0, slotFuture);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index cf08687..973c7d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -374,12 +374,8 @@ public class ExecutionVertexDeploymentTest extends TestLogger {
AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
- Slot root = mock(Slot.class);
- when(root.getSlotNumber()).thenReturn(1);
- SimpleSlot slot = mock(SimpleSlot.class);
- when(slot.getRoot()).thenReturn(root);
- when(slot.getAllocatedSlot()).thenReturn(allocatedSlot);
- when(root.getAllocatedSlot()).thenReturn(allocatedSlot);
+ LogicalSlot slot = mock(LogicalSlot.class);
+ when(slot.getAllocationId()).thenReturn(new AllocationID());
for (ScheduleMode mode : ScheduleMode.values()) {
vertex.getExecutionGraph().setScheduleMode(mode);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index 5d7fa1f..24affad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
class ProgrammedSlotProvider implements SlotProvider {
- private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
+ private final Map<JobVertexID, CompletableFuture<LogicalSlot>[]> slotFutures = new HashMap<>();
private final Map<JobVertexID, CompletableFuture<Boolean>[]> slotFutureRequested = new HashMap<>();
@@ -49,17 +49,17 @@ class ProgrammedSlotProvider implements SlotProvider {
this.parallelism = parallelism;
}
- public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<SimpleSlot> future) {
+ public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<LogicalSlot> future) {
checkNotNull(vertex);
checkNotNull(future);
checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
- CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
+ CompletableFuture<LogicalSlot>[] futures = slotFutures.get(vertex);
CompletableFuture<Boolean>[] requestedFutures = slotFutureRequested.get(vertex);
if (futures == null) {
@SuppressWarnings("unchecked")
- CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
+ CompletableFuture<LogicalSlot>[] newArray = (CompletableFuture<LogicalSlot>[]) new CompletableFuture<?>[parallelism];
futures = newArray;
slotFutures.put(vertex, futures);
@@ -71,7 +71,7 @@ class ProgrammedSlotProvider implements SlotProvider {
requestedFutures[subtaskIndex] = new CompletableFuture<>();
}
- public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
+ public void addSlots(JobVertexID vertex, CompletableFuture<LogicalSlot>[] futures) {
checkNotNull(vertex);
checkNotNull(futures);
checkArgument(futures.length == parallelism);
@@ -92,16 +92,16 @@ class ProgrammedSlotProvider implements SlotProvider {
}
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
+ public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
- CompletableFuture<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+ CompletableFuture<LogicalSlot>[] forTask = slotFutures.get(vertexId);
if (forTask != null) {
- CompletableFuture<SimpleSlot> future = forTask[subtask];
+ CompletableFuture<LogicalSlot> future = forTask[subtask];
if (future != null) {
slotFutureRequested.get(vertexId)[subtask].complete(true);
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index a2323bf..3d28983 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -71,7 +72,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
}
@Override
- public CompletableFuture<SimpleSlot> allocateSlot(
+ public CompletableFuture<LogicalSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 0edef5e..4a6bf75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -38,7 +39,7 @@ import static org.junit.Assert.*;
/**
* Tests for the allocation, properties, and release of shared slots.
*/
-public class SharedSlotsTest {
+public class SharedSlotsTest extends TestLogger {
private static final Iterable<TaskManagerLocation> NO_LOCATION = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index fd1c17b..db71210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -29,11 +29,13 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import org.mockito.Matchers;
-public class SimpleSlotTest {
+public class SimpleSlotTest extends TestLogger {
@Test
public void testStateTransitions() {
@@ -81,11 +83,11 @@ public class SimpleSlotTest {
{
SimpleSlot slot = getSlot();
- assertTrue(slot.setExecutedVertex(ev));
+ assertTrue(slot.setExecution(ev));
assertEquals(ev, slot.getExecutedVertex());
// try to add another one
- assertFalse(slot.setExecutedVertex(ev_2));
+ assertFalse(slot.setExecution(ev_2));
assertEquals(ev, slot.getExecutedVertex());
}
@@ -94,7 +96,7 @@ public class SimpleSlotTest {
SimpleSlot slot = getSlot();
assertTrue(slot.markCancelled());
- assertFalse(slot.setExecutedVertex(ev));
+ assertFalse(slot.setExecution(ev));
assertNull(slot.getExecutedVertex());
}
@@ -104,7 +106,7 @@ public class SimpleSlotTest {
assertTrue(slot.markCancelled());
assertTrue(slot.markReleased());
- assertFalse(slot.setExecutedVertex(ev));
+ assertFalse(slot.setExecution(ev));
assertNull(slot.getExecutedVertex());
}
@@ -113,7 +115,7 @@ public class SimpleSlotTest {
SimpleSlot slot = getSlot();
slot.releaseSlot();
- assertFalse(slot.setExecutedVertex(ev));
+ assertFalse(slot.setExecution(ev));
assertNull(slot.getExecutedVertex());
}
}
@@ -129,7 +131,7 @@ public class SimpleSlotTest {
Execution ev = mock(Execution.class);
SimpleSlot slot = getSlot();
- assertTrue(slot.setExecutedVertex(ev));
+ assertTrue(slot.setExecution(ev));
assertEquals(ev, slot.getExecutedVertex());
slot.releaseSlot();
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index ca5d826..8875e00 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -107,7 +107,7 @@ public class SlotPoolRpcTest extends TestLogger {
try {
pool.start(JobMasterId.generate(), "foobar");
- CompletableFuture<SimpleSlot> future = pool.allocateSlot(
+ CompletableFuture<LogicalSlot> future = pool.allocateSlot(
new SlotPoolGateway.SlotRequestID(),
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
@@ -142,7 +142,7 @@ public class SlotPoolRpcTest extends TestLogger {
SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
@@ -186,7 +186,7 @@ public class SlotPoolRpcTest extends TestLogger {
pool.connectToResourceManager(resourceManagerGateway);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
@@ -237,7 +237,7 @@ public class SlotPoolRpcTest extends TestLogger {
pool.connectToResourceManager(resourceManagerGateway);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
new ScheduledUnit(SchedulerTestUtils.getDummyTask()),
DEFAULT_TESTING_PROFILE,
@@ -300,7 +300,7 @@ public class SlotPoolRpcTest extends TestLogger {
ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask());
// test the pending request is clear when timed out
- CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(
+ CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot(
mockScheduledUnit,
true,
Collections.emptyList());
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index c307373..271bc2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -96,7 +96,7 @@ public class SlotPoolTest extends TestLogger {
slotPoolGateway.registerTaskManager(resourceID);
SlotPoolGateway.SlotRequestID requestId = new SlotPoolGateway.SlotRequestID();
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(requestId, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -107,13 +107,11 @@ public class SlotPoolTest extends TestLogger {
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
- SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+ LogicalSlot slot = future.get(1, TimeUnit.SECONDS);
assertTrue(future.isDone());
assertTrue(slot.isAlive());
- assertEquals(resourceID, slot.getTaskManagerID());
- assertEquals(jobId, slot.getJobID());
- assertEquals(slotPool.getSlotOwner(), slot.getOwner());
- assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocatedSlot().getSlotAllocationId()), slot);
+ assertEquals(resourceID, slot.getTaskManagerLocation().getResourceID());
+ assertEquals(slotPool.getAllocatedSlots().get(slot.getAllocationId()), slot);
} finally {
slotPool.shutDown();
}
@@ -129,8 +127,8 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
- CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future1.isDone());
assertFalse(future2.isDone());
@@ -144,7 +142,7 @@ public class SlotPoolTest extends TestLogger {
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
- SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+ LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());
@@ -152,15 +150,15 @@ public class SlotPoolTest extends TestLogger {
slot1.releaseSlot();
// second allocation fulfilled by previous slot returning
- SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+ LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
assertTrue(future2.isDone());
assertNotEquals(slot1, slot2);
assertTrue(slot1.isReleased());
assertTrue(slot2.isAlive());
- assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
- assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
- assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocatedSlot().getSlotAllocationId()), slot2);
+ assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
+ assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
+ assertEquals(slotPool.getAllocatedSlots().get(slot1.getAllocationId()), slot2);
} finally {
slotPool.shutDown();
}
@@ -176,7 +174,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
- CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future1.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -187,23 +185,23 @@ public class SlotPoolTest extends TestLogger {
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
- SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+ LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
// return this slot to pool
slot1.releaseSlot();
- CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
// second allocation fulfilled by previous slot returning
- SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+ LogicalSlot slot2 = future2.get(1, TimeUnit.SECONDS);
assertTrue(future2.isDone());
assertNotEquals(slot1, slot2);
assertTrue(slot1.isReleased());
assertTrue(slot2.isAlive());
- assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID());
- assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+ assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
+ assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
} finally {
slotPool.shutDown();
}
@@ -219,7 +217,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
- CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -240,7 +238,7 @@ public class SlotPoolTest extends TestLogger {
// accepted slot
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
- SimpleSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ LogicalSlot slot = future.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(slot.isAlive());
// duplicated offer with using slot
@@ -275,19 +273,19 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPoolGateway.registerTaskManager(resourceID);
- CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
- CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
+ CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(new SlotPoolGateway.SlotRequestID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout);
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
- SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+ LogicalSlot slot1 = future1.get(1, TimeUnit.SECONDS);
assertTrue(future1.isDone());
assertFalse(future2.isDone());
@@ -332,7 +330,7 @@ public class SlotPoolTest extends TestLogger {
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
- CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot(
+ CompletableFuture<LogicalSlot> slotFuture = slotPoolGateway.allocateSlot(
new SlotPoolGateway.SlotRequestID(),
scheduledUnit,
ResourceProfile.UNKNOWN,
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
new file mode 100644
index 0000000..36a47b7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/TestingLogicalSlot.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Simple testing logical slot for testing purposes.
+ */
+public class TestingLogicalSlot implements LogicalSlot {
+
+ private final TaskManagerLocation taskManagerLocation;
+
+ private final TaskManagerGateway taskManagerGateway;
+
+ private final CompletableFuture<?> releaseFuture;
+
+ private final AtomicReference<Execution> executionReference;
+
+ private final int slotNumber;
+
+ private final AllocationID allocationId;
+
+ public TestingLogicalSlot() {
+ this(
+ new LocalTaskManagerLocation(),
+ new SimpleAckingTaskManagerGateway(),
+ 0,
+ new AllocationID());
+ }
+
+ public TestingLogicalSlot(
+ TaskManagerLocation taskManagerLocation,
+ TaskManagerGateway taskManagerGateway,
+ int slotNumber,
+ AllocationID allocationId) {
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
+ this.taskManagerGateway = Preconditions.checkNotNull(taskManagerGateway);
+ this.releaseFuture = new CompletableFuture<>();
+ this.executionReference = new AtomicReference<>();
+ this.slotNumber = slotNumber;
+ this.allocationId = Preconditions.checkNotNull(allocationId);
+ }
+
+ @Override
+ public TaskManagerLocation getTaskManagerLocation() {
+ return taskManagerLocation;
+ }
+
+ @Override
+ public TaskManagerGateway getTaskManagerGateway() {
+ return taskManagerGateway;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return !releaseFuture.isDone();
+ }
+
+ @Override
+ public boolean isCanceled() {
+ return releaseFuture.isDone();
+ }
+
+ @Override
+ public boolean isReleased() {
+ return releaseFuture.isDone();
+ }
+
+ @Override
+ public boolean setExecution(Execution execution) {
+ return executionReference.compareAndSet(null, execution);
+ }
+
+ @Override
+ public void releaseSlot() {
+ releaseFuture.complete(null);
+ }
+
+ @Override
+ public int getPhysicalSlotNumber() {
+ return slotNumber;
+ }
+
+ @Override
+ public AllocationID getAllocationId() {
+ return allocationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 0346e48..f57726c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -32,11 +32,13 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-public class PartialConsumePipelinedResultTest {
+public class PartialConsumePipelinedResultTest extends TestLogger {
// Test configuration
private final static int NUMBER_OF_TMS = 1;
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9c64b1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index 9c781ec1..d861455 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.types.IntValue;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -39,7 +40,7 @@ import java.util.List;
import static org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest.SubtaskIndexReceiver.CONFIG_KEY;
-public class ScheduleOrUpdateConsumersTest {
+public class ScheduleOrUpdateConsumersTest extends TestLogger {
private final static int NUMBER_OF_TMS = 2;
private final static int NUMBER_OF_SLOTS_PER_TM = 2;