You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/21 12:22:18 UTC
[38/50] [abbrv] flink git commit: [FLINK-4360] [tm] Implement TM ->
JM registration logic
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
new file mode 100644
index 0000000..66d8102
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+/**
+ * Exception indicating that the slot allocation on the task manager failed.
+ */
+public class SlotAllocationException extends TaskManagerException {
+
+ private static final long serialVersionUID = -4764932098204266773L;
+
+ public SlotAllocationException(String message) {
+ super(message);
+ }
+
+ public SlotAllocationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SlotAllocationException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 42cb919..88123b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,10 +70,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
/** Interface for slot actions, such as freeing them or timing them out */
private SlotActions slotActions;
-
- /** The timeout for allocated slots */
- private Time slotTimeout;
-
+
/** Whether the table has been started */
private boolean started;
@@ -104,7 +101,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
slotsPerJob = new HashMap<>(4);
slotActions = null;
- slotTimeout = null;
started = false;
}
@@ -112,11 +108,9 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* Start the task slot table with the given slot actions and slot timeout value.
*
* @param initialSlotActions to use for slot actions
- * @param initialSlotTimeout to use for slot timeouts
*/
- public void start(SlotActions initialSlotActions, Time initialSlotTimeout) {
+ public void start(SlotActions initialSlotActions) {
this.slotActions = Preconditions.checkNotNull(initialSlotActions);
- this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout);
timerService.start(this);
@@ -129,7 +123,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
public void stop() {
started = false;
timerService.stop();
- slotTimeout = null;
slotActions = null;
}
@@ -144,9 +137,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @param index of the task slot to allocate
* @param jobId to allocate the task slot for
* @param allocationId identifying the allocation
+ * @param slotTimeout until the slot times out
* @return True if the task slot could be allocated; otherwise false
*/
- public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) {
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
checkInit();
TaskSlot taskSlot = taskSlots.get(index);
@@ -180,7 +174,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
*
* @param allocationId to identify the task slot to mark as active
* @throws SlotNotFoundException if the slot could not be found for the given allocation id
- * @return True if the slot could be marked active
+ * @return True if the slot could be marked active; otherwise false
*/
public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
checkInit();
@@ -190,6 +184,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
if (taskSlot != null) {
if (taskSlot.markActive()) {
// unregister a potential timeout
+ LOG.info("Activate slot {}.", allocationId);
+
timerService.unregisterTimeout(allocationId);
return true;
@@ -206,10 +202,11 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* then a {@link SlotNotFoundException} is thrown.
*
* @param allocationId to identify the task slot to mark as inactive
+ * @param slotTimeout until the slot times out
* @throws SlotNotFoundException if the slot could not be found for the given allocation id
* @return True if the slot could be marked inactive
*/
- public boolean markSlotInactive(AllocationID allocationId) throws SlotNotFoundException {
+ public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
checkInit();
TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -253,6 +250,12 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
*/
public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Free slot {}.", allocationId, cause);
+ } else {
+ LOG.info("Free slot {}.", allocationId);
+ }
TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -322,8 +325,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @return True if the given task slot is allocated for the given job and allocation id
*/
public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
- checkInit();
-
TaskSlot taskSlot = taskSlots.get(index);
return taskSlot.isAllocated(jobId, allocationId);
@@ -336,7 +337,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @param allocationId identifying the allocation
* @return True if there exists a task slot which is active for the given job and allocation id.
*/
- public boolean existActiveSlot(JobID jobId, AllocationID allocationId) {
+ public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) {
TaskSlot taskSlot = getTaskSlot(allocationId);
if (taskSlot != null) {
@@ -431,6 +432,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @return The removed task if there is any for the given execution attempt id; otherwise null
*/
public Task removeTask(ExecutionAttemptID executionAttemptID) {
+ checkInit();
+
TaskSlotMapping taskSlotMapping = taskSlotMappings.remove(executionAttemptID);
if (taskSlotMapping != null) {
@@ -481,6 +484,8 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
@Override
public void notifyTimeout(AllocationID key, UUID ticket) {
+ checkInit();
+
if (slotActions != null) {
slotActions.timeoutSlot(key, ticket);
}
@@ -493,9 +498,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
private TaskSlot getTaskSlot(AllocationID allocationId) {
Preconditions.checkNotNull(allocationId);
- TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
-
- return taskSlot;
+ return allocationIDTaskSlotMap.get(allocationId);
}
private void checkInit() {
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f16255e..ccca3c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -265,7 +265,7 @@ public class Task implements Runnable, TaskActions {
this.jobId = checkNotNull(tdd.getJobID());
this.vertexId = checkNotNull(tdd.getVertexID());
this.executionId = checkNotNull(tdd.getExecutionId());
- this.allocationId = checkNotNull(tdd.getAllocationID());
+ this.allocationId = checkNotNull(tdd.getAllocationId());
this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 39ea176..993fd19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -44,6 +45,7 @@ public class TaskDeploymentDescriptorTest {
public void testSerialization() {
try {
final JobID jobID = new JobID();
+ final AllocationID allocationId = new AllocationID();
final JobVertexID vertexID = new JobVertexID();
final ExecutionAttemptID execId = new ExecutionAttemptID();
final String jobName = "job name";
@@ -61,7 +63,7 @@ public class TaskDeploymentDescriptorTest {
final List<URL> requiredClasspaths = new ArrayList<URL>(0);
final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
- final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId,
+ final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, allocationId, jobName, vertexID, execId,
executionConfig, taskName, numberOfKeyGroups, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber,
jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates,
requiredJars, requiredClasspaths, 47);
@@ -69,12 +71,14 @@ public class TaskDeploymentDescriptorTest {
final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
assertFalse(orig.getJobID() == copy.getJobID());
+ assertFalse(orig.getAllocationId() == copy.getAllocationId());
assertFalse(orig.getVertexID() == copy.getVertexID());
assertFalse(orig.getTaskName() == copy.getTaskName());
assertFalse(orig.getJobConfiguration() == copy.getJobConfiguration());
assertFalse(orig.getTaskConfiguration() == copy.getTaskConfiguration());
assertEquals(orig.getJobID(), copy.getJobID());
+ assertEquals(orig.getAllocationId(), copy.getAllocationId());
assertEquals(orig.getVertexID(), copy.getVertexID());
assertEquals(orig.getTaskName(), copy.getTaskName());
assertEquals(orig.getNumberOfKeyGroups(), copy.getNumberOfKeyGroups());
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index a255027..38e372d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception {
LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
if (service != null) {
return service;
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 49f2268..073aeac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -75,7 +76,8 @@ public class TaskManagerGroupTest extends TestLogger {
final ExecutionAttemptID execution21 = new ExecutionAttemptID();
TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
- jid1,
+ jid1,
+ new AllocationID(),
jobName1,
vertex11,
execution11,
@@ -91,6 +93,7 @@ public class TaskManagerGroupTest extends TestLogger {
TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
jid1,
+ new AllocationID(),
jobName1,
vertex12,
execution12,
@@ -106,6 +109,7 @@ public class TaskManagerGroupTest extends TestLogger {
TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
jid2,
+ new AllocationID(),
jobName2,
vertex21,
execution21,
@@ -121,6 +125,7 @@ public class TaskManagerGroupTest extends TestLogger {
TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor(
jid1,
+ new AllocationID(),
jobName1,
vertex13,
execution13,
@@ -192,6 +197,7 @@ public class TaskManagerGroupTest extends TestLogger {
TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
jid1,
+ new AllocationID(),
jobName1,
vertex11,
execution11,
@@ -207,6 +213,7 @@ public class TaskManagerGroupTest extends TestLogger {
TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
jid1,
+ new AllocationID(),
jobName1,
vertex12,
execution12,
@@ -222,6 +229,7 @@ public class TaskManagerGroupTest extends TestLogger {
TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
jid2,
+ new AllocationID(),
jobName2,
vertex21,
execution21,
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 558d3c2..948c129 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -68,7 +68,7 @@ public class SlotManagerTest {
taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class);
TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class);
Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
- Mockito.when(gateway.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+ Mockito.when(gateway.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 24d959e..86cd1f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger {
Mockito
.when(
taskExecutorGateway
- .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+ .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
testRpcService.registerGateway(tmAddress, taskExecutorGateway);
@@ -161,7 +161,7 @@ public class SlotProtocolTest extends TestLogger {
// 4) Slot becomes available and TaskExecutor gets a SlotRequest
verify(taskExecutorGateway, timeout(5000))
- .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class));
+ .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
}
/**
@@ -189,7 +189,7 @@ public class SlotProtocolTest extends TestLogger {
TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
Mockito.when(
taskExecutorGateway
- .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class)))
+ .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
testRpcService.registerGateway(tmAddress, taskExecutorGateway);
@@ -240,7 +240,7 @@ public class SlotProtocolTest extends TestLogger {
// 4) a SlotRequest is routed to the TaskExecutor
verify(taskExecutorGateway, timeout(5000))
- .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class));
+ .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
}
private static TestingLeaderElectionService configureHA(
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e7143ae..bbde331 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -287,17 +288,27 @@ public class RpcCompletenessTest extends TestLogger {
if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
return false;
} else {
- Class<?> valueClass = ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType());
+ ReflectionUtil.FullTypeInfo fullValueTypeInfo = ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0);
if (endpointMethod.getReturnType().equals(futureClass)) {
- Class<?> rpcEndpointValueClass = ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType());
+ ReflectionUtil.FullTypeInfo fullRpcEndpointValueTypeInfo = ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0);
// check if we have the same future value types
- if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) {
- return false;
+ if (fullValueTypeInfo != null && fullRpcEndpointValueTypeInfo != null) {
+ Iterator<Class<?>> valueClasses = fullValueTypeInfo.getClazzIterator();
+ Iterator<Class<?>> rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator();
+
+ while (valueClasses.hasNext() && rpcClasses.hasNext()) {
+ if (!checkType(valueClasses.next(), rpcClasses.next())) {
+ return false;
+ }
+ }
+
+ // both should be empty
+ return !valueClasses.hasNext() && !rpcClasses.hasNext();
}
} else {
- if (valueClass != null && !checkType(valueClass, endpointMethod.getReturnType())) {
+ if (fullValueTypeInfo != null && !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) {
return false;
}
}
@@ -342,16 +353,16 @@ public class RpcCompletenessTest extends TestLogger {
if (method.getReturnType().equals(Void.TYPE)) {
builder.append("void").append(" ");
} else if (method.getReturnType().equals(futureClass)) {
- Class<?> valueClass = ReflectionUtil.getTemplateType1(method.getGenericReturnType());
+ ReflectionUtil.FullTypeInfo fullTypeInfo = ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0);
builder
.append(futureClass.getSimpleName())
.append("<")
- .append(valueClass != null ? valueClass.getSimpleName() : "")
+ .append(fullTypeInfo != null ? fullTypeInfo.toString() : "")
.append(">");
- if (valueClass != null) {
- builder.append("/").append(valueClass.getSimpleName());
+ if (fullTypeInfo != null) {
+ builder.append("/").append(fullTypeInfo);
}
builder.append(" ");
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index baae251..23c6833 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,21 +18,45 @@
package org.apache.flink.runtime.taskexecutor;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.NonHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
@@ -40,19 +64,42 @@ import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
+import static org.hamcrest.Matchers.contains;
public class TaskExecutorTest extends TestLogger {
+ @Rule
+ public TestName name = new TestName();
+
+
@Test
public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
final ResourceID resourceID = ResourceID.generate();
@@ -85,6 +132,8 @@ public class TaskExecutorTest extends TestLogger {
mock(BroadcastVariableManager.class),
mock(FileCache.class),
mock(TaskSlotTable.class),
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -142,6 +191,8 @@ public class TaskExecutorTest extends TestLogger {
mock(BroadcastVariableManager.class),
mock(FileCache.class),
mock(TaskSlotTable.class),
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -173,17 +224,385 @@ public class TaskExecutorTest extends TestLogger {
}
/**
+ * Tests that we can submit a task to the TaskManager given that we've allocated a slot there.
+ */
+ @Test(timeout = 1000L)
+ public void testTaskSubmission() throws Exception {
+ final Configuration configuration = new Configuration();
+
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ final JobID jobId = new JobID();
+ final AllocationID allocationId = new AllocationID();
+ final UUID jobManagerLeaderId = UUID.randomUUID();
+ final JobVertexID jobVertexId = new JobVertexID();
+
+ final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+ jobId,
+ allocationId,
+ name.getMethodName(),
+ jobVertexId,
+ new ExecutionAttemptID(),
+ new SerializedValue<>(new ExecutionConfig()),
+ "test task",
+ 1,
+ 0,
+ 1,
+ 0,
+ configuration,
+ configuration,
+ TestInvokable.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
+
+ final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
+ when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+
+ final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+ mock(JobMasterGateway.class),
+ jobManagerLeaderId,
+ mock(TaskManagerActions.class),
+ mock(CheckpointResponder.class),
+ libraryCacheManager,
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(PartitionStateChecker.class));
+
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ jobManagerTable.put(jobId, jobManagerConnection);
+
+ final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+ when(taskSlotTable.existsActiveSlot(eq(jobId), eq(allocationId))).thenReturn(true);
+ when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
+
+ final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+
+ when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
+
+ final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
+
+ when(taskManagerMetricGroup.addTaskForJob(eq(tdd))).thenReturn(mock(TaskMetricGroup.class));
+
+ final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
+ when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
+
+ try {
+
+ TaskExecutor taskManager = new TaskExecutor(
+ taskManagerConfiguration,
+ mock(TaskManagerLocation.class),
+ rpc,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ networkEnvironment,
+ haServices,
+ mock(MetricRegistry.class),
+ taskManagerMetricGroup,
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ jobManagerTable,
+ mock(JobLeaderService.class),
+ mock(FatalErrorHandler.class));
+
+ taskManager.start();
+
+ taskManager.submitTask(tdd, jobManagerLeaderId);
+
+ Future<Boolean> completionFuture = TestInvokable.completableFuture;
+
+ completionFuture.get();
+
+ } finally {
+ rpc.stopService();
+ }
+ }
+
+ /**
+ * Test invokable which completes the given future when executed.
+ */
+ public static class TestInvokable extends AbstractInvokable {
+
+ static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture<>();
+
+ @Override
+ public void invoke() throws Exception {
+ completableFuture.complete(true);
+ }
+ }
+
+ /**
+ * Tests that a TaskManager detects a job leader for which has reserved slots. Upon detecting
+ * the job leader, it will offer all reserved slots to the JobManager.
+ */
+ @Test
+ public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException {
+ final JobID jobId = new JobID();
+
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
+ final Configuration configuration = new Configuration();
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ final ResourceID resourceId = new ResourceID("foobar");
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ final TimerService<AllocationID> timerService = mock(TimerService.class);
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+ final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+ haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+ haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
+
+ final String resourceManagerAddress = "rm";
+ final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+ final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+ final InstanceID registrationId = new InstanceID();
+
+ when(resourceManagerGateway.registerTaskExecutor(
+ eq(resourceManagerLeaderId),
+ any(String.class),
+ eq(resourceId),
+ any(SlotReport.class),
+ any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+ final String jobManagerAddress = "jm";
+ final UUID jobManagerLeaderId = UUID.randomUUID();
+ final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+ final int blobPort = 42;
+
+ final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+ when(jobMasterGateway.registerTaskManager(
+ any(String.class),
+ eq(resourceId),
+ eq(jobManagerLeaderId),
+ any(Time.class)
+ )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+ rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+ rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+ final AllocationID allocationId = new AllocationID();
+ final SlotID slotId = new SlotID(resourceId, 0);
+
+ try {
+ TaskExecutor taskManager = new TaskExecutor(
+ taskManagerConfiguration,
+ taskManagerLocation,
+ rpc,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices,
+ mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ jobManagerTable,
+ jobLeaderService,
+ testingFatalErrorHandler);
+
+ taskManager.start();
+
+ // tell the task manager about the rm leader
+ resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
+
+ // request slots from the task manager under the given allocation id
+ TMSlotRequestReply reply = taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, resourceManagerLeaderId);
+
+ // this is hopefully successful :-)
+ assertTrue(reply instanceof TMSlotRequestRegistered);
+
+ // now inform the task manager about the new job leader
+ jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, jobManagerLeaderId);
+
+ // the job leader should get the allocation id offered
+ verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)), eq(jobManagerLeaderId), any(Time.class));
+ } finally {
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowException();
+
+ rpc.stopService();
+ }
+ }
+
+ /**
+ * Tests that accepted slots go into state assigned and the others are returned to the resource
+ * manager.
+ */
+ @Test
+ public void testSlotAcceptance() throws Exception {
+ final JobID jobId = new JobID();
+
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
+ final Configuration configuration = new Configuration();
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ final ResourceID resourceId = new ResourceID("foobar");
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ final TimerService<AllocationID> timerService = mock(TimerService.class);
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+ final String resourceManagerAddress = "rm";
+ final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+ final String jobManagerAddress = "jm";
+ final UUID jobManagerLeaderId = UUID.randomUUID();
+
+ final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
+ final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+ haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+ haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
+
+ final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+ final InstanceID registrationId = new InstanceID();
+
+ when(resourceManagerGateway.registerTaskExecutor(
+ eq(resourceManagerLeaderId),
+ any(String.class),
+ eq(resourceId),
+ any(SlotReport.class),
+ any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+ final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+ final int blobPort = 42;
+
+ final AllocationID allocationId1 = new AllocationID();
+ final AllocationID allocationId2 = new AllocationID();
+
+ final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+ when(jobMasterGateway.registerTaskManager(
+ any(String.class),
+ eq(resourceId),
+ eq(jobManagerLeaderId),
+ any(Time.class)
+ )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+ when(jobMasterGateway.offerSlots(any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1)));
+
+ rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+ rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+ try {
+ TaskExecutor taskManager = new TaskExecutor(
+ taskManagerConfiguration,
+ taskManagerLocation,
+ rpc,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices,
+ mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ jobManagerTable,
+ jobLeaderService,
+ testingFatalErrorHandler);
+
+ taskManager.start();
+
+ taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
+ taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L));
+
+ // we have to add the job after the TaskExecutor, because otherwise the service has not
+ // been properly started.
+ jobLeaderService.addJob(jobId, jobManagerAddress);
+
+ verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+
+ assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
+ assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
+ assertTrue(taskSlotTable.isSlotFree(1));
+ } finally {
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowException();
+
+ rpc.stopService();
+ }
+ }
+
+ private static class TestingFatalErrorHandler implements FatalErrorHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(TestingFatalErrorHandler.class);
+ private final AtomicReference<Throwable> atomicThrowable;
+
+ public TestingFatalErrorHandler() {
+ atomicThrowable = new AtomicReference<>(null);
+ }
+
+ public void rethrowException() throws TestingException {
+ Throwable throwable = atomicThrowable.get();
+
+ if (throwable != null) {
+ throw new TestingException(throwable);
+ }
+ }
+
+ public boolean hasExceptionOccurred() {
+ return atomicThrowable.get() != null;
+ }
+
+ public Throwable getException() {
+ return atomicThrowable.get();
+ }
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ LOG.error("OnFatalError:", exception);
+ atomicThrowable.compareAndSet(null, exception);
+ }
+
+ //------------------------------------------------------------------
+ // static utility classes
+ //------------------------------------------------------------------
+
+ private static final class TestingException extends Exception {
+ public TestingException(String message) {
+ super(message);
+ }
+
+ public TestingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TestingException(Throwable cause) {
+ super(cause);
+ }
+
+ private static final long serialVersionUID = -4648195335470914498L;
+ }
+ }
+
+ /**
* Tests that all allocation requests for slots are ignored if the slot has been reported as
* free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager.
*
* This is essential for the correctness of the state of the ResourceManager.
*/
+ @Ignore
@Test
- public void testRejectAllocationRequestsForOutOfSyncSlots() {
+ public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException {
final ResourceID resourceID = ResourceID.generate();
final String address1 = "/resource/manager/address/one";
final UUID leaderId = UUID.randomUUID();
+ final JobID jobId = new JobID();
+ final String jobManagerAddress = "foobar";
final TestingSerialRpcService rpc = new TestingSerialRpcService();
try {
@@ -215,6 +634,8 @@ public class TaskExecutorTest extends TestLogger {
mock(BroadcastVariableManager.class),
mock(FileCache.class),
mock(TaskSlotTable.class),
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
mock(FatalErrorHandler.class));
taskManager.start();
@@ -232,14 +653,14 @@ public class TaskExecutorTest extends TestLogger {
// test that allocating a slot works
final SlotID slotID = new SlotID(resourceID, 0);
- TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, new AllocationID(), leaderId);
+ TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered);
+ // TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID...
// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1);
- taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID);
TMSlotRequestReply tmSlotRequestReply2 =
- taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId);
+ taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected);
// re-register
@@ -250,7 +671,7 @@ public class TaskExecutorTest extends TestLogger {
// now we should be successful because the slots status has been synced
// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
TMSlotRequestReply tmSlotRequestReply3 =
- taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId);
+ taskManager.requestSlot(unconfirmedFreeSlotID, jobId, new AllocationID(), jobManagerAddress, leaderId);
assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 8fa7463..2a9ff61 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -160,7 +161,7 @@ public class TaskAsyncCallTest {
.thenReturn(mock(TaskKvStateRegistry.class));
TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
+ new JobID(), new AllocationID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
new SerializedValue<>(new ExecutionConfig()),
"Test Task", 1, 0, 1, 0,
new Configuration(), new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d4efd24..d1909fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -163,12 +164,13 @@ public class TaskManagerTest extends TestLogger {
final ExecutionAttemptID eid = new ExecutionAttemptID();
final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
- final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
- "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(),
- TestInvokableCorrect.class.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
+ final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, new AllocationID(),
+ "TestJob", vid, eid, executionConfig,
+ "TestTask", 7, 2, 7, 0, new Configuration(), new Configuration(),
+ TestInvokableCorrect.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
new Within(d) {
@@ -265,7 +267,7 @@ public class TaskManagerTest extends TestLogger {
final ExecutionAttemptID eid2 = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
- jid1, "TestJob1", vid1, eid1,
+ jid1, new AllocationID(), "TestJob1", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"TestTask1", 5, 1, 5, 0,
new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
@@ -274,7 +276,7 @@ public class TaskManagerTest extends TestLogger {
new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
- jid2, "TestJob2", vid2, eid2,
+ jid2, new AllocationID(), "TestJob2", vid2, eid2,
new SerializedValue<>(new ExecutionConfig()),
"TestTask2", 7, 2, 7, 0,
new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
@@ -404,13 +406,13 @@ public class TaskManagerTest extends TestLogger {
final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig());
- final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
+ final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, new AllocationID(), "TestJob", vid1, eid1, executionConfig,
"TestTask1", 5, 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
- final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
+ final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, new AllocationID(), "TestJob", vid2, eid2, executionConfig,
"TestTask2", 7, 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(),
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -531,7 +533,7 @@ public class TaskManagerTest extends TestLogger {
final ExecutionAttemptID eid2 = new ExecutionAttemptID();
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
- jid, "TestJob", vid1, eid1,
+ jid, new AllocationID(), "TestJob", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"Sender", 1, 0, 1, 0,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
@@ -540,7 +542,7 @@ public class TaskManagerTest extends TestLogger {
new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
- jid, "TestJob", vid2, eid2,
+ jid, new AllocationID(), "TestJob", vid2, eid2,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 7, 2, 7, 0,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
@@ -636,7 +638,7 @@ public class TaskManagerTest extends TestLogger {
);
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
- jid, "TestJob", vid1, eid1,
+ jid, new AllocationID(), "TestJob", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"Sender", 1, 0, 1, 0,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
@@ -644,7 +646,7 @@ public class TaskManagerTest extends TestLogger {
Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
- jid, "TestJob", vid2, eid2,
+ jid, new AllocationID(), "TestJob", vid2, eid2,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 7, 2, 7, 0,
new Configuration(), new Configuration(), Tasks.Receiver.class.getName(),
@@ -781,7 +783,7 @@ public class TaskManagerTest extends TestLogger {
);
final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
- jid, "TestJob", vid1, eid1,
+ jid, new AllocationID(), "TestJob", vid1, eid1,
new SerializedValue<>(new ExecutionConfig()),
"Sender", 1, 0, 1, 0,
new Configuration(), new Configuration(), Tasks.Sender.class.getName(),
@@ -789,7 +791,7 @@ public class TaskManagerTest extends TestLogger {
new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0);
final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
- jid, "TestJob", vid2, eid2,
+ jid, new AllocationID(), "TestJob", vid2, eid2,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 7, 2, 7, 0,
new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(),
@@ -929,7 +931,7 @@ public class TaskManagerTest extends TestLogger {
new InputGateDeploymentDescriptor(resultId, 0, icdd);
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- jid, "TestJob", vid, eid,
+ jid, new AllocationID(), "TestJob", vid, eid,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 1, 0, 1, 0,
new Configuration(), new Configuration(),
@@ -1022,7 +1024,7 @@ public class TaskManagerTest extends TestLogger {
new InputGateDeploymentDescriptor(resultId, 0, icdd);
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- jid, "TestJob", vid, eid,
+ jid, new AllocationID(), "TestJob", vid, eid,
new SerializedValue<>(new ExecutionConfig()),
"Receiver", 1, 0, 1, 0,
new Configuration(), new Configuration(),
@@ -1097,24 +1099,25 @@ public class TaskManagerTest extends TestLogger {
// Single blocking task
final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- new JobID(),
- "Job",
- new JobVertexID(),
- new ExecutionAttemptID(),
- new SerializedValue<>(new ExecutionConfig()),
- "Task",
- 1,
- 0,
- 1,
- 0,
- new Configuration(),
- new Configuration(),
- Tasks.BlockingNoOpInvokable.class.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- 0);
+ new JobID(),
+ new AllocationID(),
+ "Job",
+ new JobVertexID(),
+ new ExecutionAttemptID(),
+ new SerializedValue<>(new ExecutionConfig()),
+ "Task",
+ 1,
+ 0,
+ 1,
+ 0,
+ new Configuration(),
+ new Configuration(),
+ Tasks.BlockingNoOpInvokable.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
// Submit the task
new Within(d) {
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 5d3eb3a..7d466f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -69,7 +69,7 @@ public class TaskStopTest {
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
when(tddMock.getInvokableClassName()).thenReturn("className");
- when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class));
+ when(tddMock.getAllocationId()).thenReturn(mock(AllocationID.class));
task = new Task(
tddMock,
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 50fc181..c5a9f2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -665,16 +666,18 @@ public class TaskTest {
}
return new TaskDeploymentDescriptor(
- new JobID(), "Test Job", new JobVertexID(), new ExecutionAttemptID(),
- execConfig,
- "Test Task", 1, 0, 1, 0,
- new Configuration(), new Configuration(),
- invokable.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- 0);
+ new JobID(),
+ new AllocationID(),
+ "Test Job", new JobVertexID(), new ExecutionAttemptID(),
+ execConfig,
+ "Test Task", 1, 0, 1, 0,
+ new Configuration(), new Configuration(),
+ invokable.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 1077052..f095cf4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -139,21 +140,22 @@ public class InterruptSensitiveRestoreTest {
keyGroupStateFromStream);
return new TaskDeploymentDescriptor(
- new JobID(),
- "test job name",
- new JobVertexID(),
- new ExecutionAttemptID(),
- new SerializedValue<>(new ExecutionConfig()),
- "test task name",
- 1, 0, 1, 0,
- new Configuration(),
- taskConfig,
- SourceStreamTask.class.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- 0,
+ new JobID(),
+ new AllocationID(),
+ "test job name",
+ new JobVertexID(),
+ new ExecutionAttemptID(),
+ new SerializedValue<>(new ExecutionConfig()),
+ "test task name",
+ 1, 0, 1, 0,
+ new Configuration(),
+ taskConfig,
+ SourceStreamTask.class.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0,
taskStateHandles);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/83f290a9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bb246f9..52332a2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -220,17 +221,19 @@ public class StreamTaskTest {
.thenReturn(mock(TaskKvStateRegistry.class));
TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
- new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(),
- new SerializedValue<>(new ExecutionConfig()),
- "Test Task", 1, 0, 1, 0,
- new Configuration(),
- taskConfig.getConfiguration(),
- invokable.getName(),
- Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
- Collections.<InputGateDeploymentDescriptor>emptyList(),
- Collections.<BlobKey>emptyList(),
- Collections.<URL>emptyList(),
- 0);
+ new JobID(),
+ new AllocationID(),
+ "Job Name", new JobVertexID(), new ExecutionAttemptID(),
+ new SerializedValue<>(new ExecutionConfig()),
+ "Test Task", 1, 0, 1, 0,
+ new Configuration(),
+ taskConfig.getConfiguration(),
+ invokable.getName(),
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList(),
+ 0);
return new Task(
tdd,