You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/22 23:51:06 UTC

[3/4] flink git commit: [FLINK-9406] Use equals in JobMaster#requestPartitionState

[FLINK-9406] Use equals in JobMaster#requestPartitionState

Use equals instead of referential equality in JobMaster#requestPartitionState
when comparing the producerExecution attempt id with the result partition
producer id.

This closes #6057.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a62683e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a62683e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a62683e7

Branch: refs/heads/release-1.5
Commit: a62683e782d09a3b1b3fc35009f3444128727d9f
Parents: 37b630e
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 23 00:47:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 23 00:59:50 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  | 94 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a62683e7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1df9d89..aad81a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -619,7 +619,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 						.getProducer()
 						.getCurrentExecutionAttempt();
 
-				if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) {
+				if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
 					return CompletableFuture.completedFuture(producerExecution.getState());
 				} else {
 					return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId));

http://git-wip-us.apache.org/repos/asf/flink/blob/a62683e7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 99cdc16..552c0f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -38,14 +38,22 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -67,6 +75,7 @@ import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -96,6 +105,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
@@ -615,6 +625,90 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
+	 * call for a finished result partition.
+	 */
+	@Test
+	public void testRequestPartitionState() throws Exception {
+		final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			producerConsumerJobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices);
+
+		jobMaster.start(jobMasterId, testingTimeout);
+
+		try {
+			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+			final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
+			testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+			rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+
+			final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>();
+			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+				.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+                    tddFuture.complete(taskDeploymentDescriptor);
+                    return CompletableFuture.completedFuture(Acknowledge.get());
+                })
+				.createTestingTaskExecutorGateway();
+			rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+			rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID());
+
+			final AllocationID allocationId = allocationIdFuture.get();
+
+			final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get();
+
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+
+			final Collection<SlotOffer> slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get();
+
+			assertThat(slotOffers, hasSize(1));
+			assertThat(slotOffers, contains(slotOffer));
+
+			// obtain tdd for the result partition ids
+			final TaskDeploymentDescriptor tdd = tddFuture.get();
+
+			assertThat(tdd.getProducedPartitions(), hasSize(1));
+			final ResultPartitionDeploymentDescriptor partition = tdd.getProducedPartitions().iterator().next();
+
+			final ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
+			final ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
+
+			// finish the producer task
+			jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
+
+			// request the state of the result partition of the producer
+			final CompletableFuture<ExecutionState> partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+
+			assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
+	private JobGraph producerConsumerJobGraph() {
+		final JobVertex producer = new JobVertex("Producer");
+		producer.setInvokableClass(NoOpInvokable.class);
+		final JobVertex consumer = new JobVertex("Consumer");
+		consumer.setInvokableClass(NoOpInvokable.class);
+
+		consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+		final JobGraph jobGraph = new JobGraph(producer, consumer);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
 	private File createSavepoint(long savepointId) throws IOException {
 		final File savepointFile = temporaryFolder.newFile();
 		final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());