You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/22 23:51:06 UTC
[3/4] flink git commit: [FLINK-9406] Use equals in
JobMaster#requestPartitionState
[FLINK-9406] Use equals in JobMaster#requestPartitionState
Use equals instead of referential equality in JobMaster#requestPartitionState
when comparing the producerExecution attempt id with the result partition
producer id.
This closes #6057.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a62683e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a62683e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a62683e7
Branch: refs/heads/release-1.5
Commit: a62683e782d09a3b1b3fc35009f3444128727d9f
Parents: 37b630e
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 23 00:47:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 23 00:59:50 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMaster.java | 2 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 94 ++++++++++++++++++++
2 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a62683e7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1df9d89..aad81a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -619,7 +619,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
.getProducer()
.getCurrentExecutionAttempt();
- if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
+ if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
return CompletableFuture.completedFuture(producerExecution.getState());
} else {
return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId));
http://git-wip-us.apache.org/repos/asf/flink/blob/a62683e7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 99cdc16..552c0f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -38,14 +38,22 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -67,6 +75,7 @@ import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -96,6 +105,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@@ -615,6 +625,90 @@ public class JobMasterTest extends TestLogger {
}
}
+ /**
+ * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
+ * call for a finished result partition.
+ */
+ @Test
+ public void testRequestPartitionState() throws Exception {
+ final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
+ final JobMaster jobMaster = createJobMaster(
+ JobMasterConfiguration.fromConfiguration(configuration),
+ producerConsumerJobGraph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices);
+
+ jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+ final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+ testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+ rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+
+ final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>();
+ final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+ .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+ tddFuture.complete(taskDeploymentDescriptor);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .createTestingTaskExecutorGateway();
+ rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
+
+ final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+ rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
+
+ final AllocationID allocationId = allocationIdFuture.get();
+
+ final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+ final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+
+ final Collection<SlotOffer> slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get();
+
+ assertThat(slotOffers, hasSize(1));
+ assertThat(slotOffers, contains(slotOffer));
+
+ // obtain tdd for the result partition ids
+ final TaskDeploymentDescriptor tdd = tddFuture.get();
+
+ assertThat(tdd.getProducedPartitions(), hasSize(1));
+ final ResultPartitionDeploymentDescriptor partition = tdd.getProducedPartitions().iterator().next();
+
+ final ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
+ final ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
+
+ // finish the producer task
+ jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
+
+ // request the state of the result partition of the producer
+ final CompletableFuture<ExecutionState> partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+
+ assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+ }
+ }
+
+ private JobGraph producerConsumerJobGraph() {
+ final JobVertex producer = new JobVertex("Producer");
+ producer.setInvokableClass(NoOpInvokable.class);
+ final JobVertex consumer = new JobVertex("Consumer");
+ consumer.setInvokableClass(NoOpInvokable.class);
+
+ consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+ final JobGraph jobGraph = new JobGraph(producer, consumer);
+ jobGraph.setAllowQueuedScheduling(true);
+
+ return jobGraph;
+ }
+
private File createSavepoint(long savepointId) throws IOException {
final File savepointFile = temporaryFolder.newFile();
final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());