You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/01 11:33:05 UTC

[flink] branch master updated (726d9e4 -> 6f6cfde)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 726d9e4  [FLINK-12758][ml] Add flink-ml-lib module
     new a853cda  [FLINK-13001][tests] Add TestingExecutionGraphBuilder
     new 6f6cfde  [FLINK-12997][coordination] Release partitions on vertex reset

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/executiongraph/Execution.java    |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +
 .../executiongraph/ExecutionGraphTestUtils.java    | 177 +++++++++++++++++----
 .../executiongraph/ExecutionVertexTest.java        |  86 ++++++++++
 4 files changed, 239 insertions(+), 30 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java


[flink] 01/02: [FLINK-13001][tests] Add TestingExecutionGraphBuilder

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a853cdab96435a1cf978ec13fb4bf484e2c0e553
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 26 13:41:51 2019 +0200

    [FLINK-13001][tests] Add TestingExecutionGraphBuilder
---
 .../executiongraph/ExecutionGraphTestUtils.java    | 177 +++++++++++++++++----
 1 file changed, 148 insertions(+), 29 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index f614795..8623eda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -41,12 +46,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,6 +63,7 @@ import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
@@ -397,23 +404,14 @@ public class ExecutionGraphTestUtils {
 		checkNotNull(vertices);
 		checkNotNull(timeout);
 
-		return ExecutionGraphBuilder.buildGraph(
-			null,
-			new JobGraph(jid, "test job", vertices),
-			new Configuration(),
-			executor,
-			executor,
-			slotProvider,
-			ExecutionGraphTestUtils.class.getClassLoader(),
-			new StandaloneCheckpointRecoveryFactory(),
-			timeout,
-			restartStrategy,
-			new UnregisteredMetricsGroup(),
-			VoidBlobWriter.getInstance(),
-			timeout,
-			TEST_LOGGER,
-			NettyShuffleMaster.INSTANCE,
-			NoOpPartitionTracker.INSTANCE);
+		return new TestingExecutionGraphBuilder(vertices)
+			.setFutureExecutor(executor)
+			.setIoExecutor(executor)
+			.setSlotProvider(slotProvider)
+			.setAllocationTimeout(timeout)
+			.setRpcTimeout(timeout)
+			.setRestartStrategy(restartStrategy)
+			.build();
 	}
 
 	public static JobVertex createNoOpVertex(int parallelism) {
@@ -445,16 +443,10 @@ public class ExecutionGraphTestUtils {
 		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(AbstractInvokable.class);
 
-		ExecutionGraph graph = new ExecutionGraph(
-			executor,
-			executor,
-			new JobID(), 
-			"test job", 
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy(),
-			new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+		ExecutionGraph graph = new TestingExecutionGraphBuilder(ajv)
+			.setIoExecutor(executor)
+			.setFutureExecutor(executor)
+			.build();
 
 		graph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
 
@@ -537,4 +529,131 @@ public class ExecutionGraphTestUtils {
 			subtaskIndex++;
 		}
 	}
+
+	/**
+	 * Builder for {@link ExecutionGraph}.
+	 */
+	public static class TestingExecutionGraphBuilder {
+
+		private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+		private Time allocationTimeout = Time.seconds(10L);
+		private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+		private MetricGroup metricGroup = new UnregisteredMetricsGroup();
+		private RestartStrategy restartStrategy = new NoRestartStrategy();
+		private Time rpcTimeout = AkkaUtils.getDefaultTimeout();
+		private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
+		private ClassLoader classLoader = getClass().getClassLoader();
+		private SlotProvider slotProvider = new TestingSlotProvider(slotRequestId -> CompletableFuture.completedFuture(new TestingLogicalSlot()));
+		private Executor ioExecutor = TestingUtils.defaultExecutor();
+		private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
+		private Configuration jobMasterConfig = new Configuration();
+		private JobGraph jobGraph;
+		private PartitionTracker partitionTracker = NoOpPartitionTracker.INSTANCE;
+
+		public TestingExecutionGraphBuilder(final JobVertex ... jobVertices) {
+			this(new JobID(), "test job", jobVertices);
+		}
+
+		public TestingExecutionGraphBuilder(final JobID jobId, final JobVertex ... jobVertices) {
+			this(jobId, "test job", jobVertices);
+		}
+
+		public TestingExecutionGraphBuilder(final String jobName, final JobVertex ... jobVertices) {
+			this(new JobID(), jobName, jobVertices);
+		}
+
+		public TestingExecutionGraphBuilder(final JobID jobId, final String jobName, final JobVertex ... jobVertices) {
+			this(new JobGraph(jobId, jobName, jobVertices));
+		}
+
+		public TestingExecutionGraphBuilder(final JobGraph jobGraph) {
+			this.jobGraph = jobGraph;
+		}
+
+		public TestingExecutionGraphBuilder setJobMasterConfig(final Configuration jobMasterConfig) {
+			this.jobMasterConfig = jobMasterConfig;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setFutureExecutor(final ScheduledExecutorService futureExecutor) {
+			this.futureExecutor = futureExecutor;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setIoExecutor(final Executor ioExecutor) {
+			this.ioExecutor = ioExecutor;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setSlotProvider(final SlotProvider slotProvider) {
+			this.slotProvider = slotProvider;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setClassLoader(final ClassLoader classLoader) {
+			this.classLoader = classLoader;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setCheckpointRecoveryFactory(final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setRpcTimeout(final Time rpcTimeout) {
+			this.rpcTimeout = rpcTimeout;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setRestartStrategy(final RestartStrategy restartStrategy) {
+			this.restartStrategy = restartStrategy;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setMetricGroup(final MetricGroup metricGroup) {
+			this.metricGroup = metricGroup;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setBlobWriter(final BlobWriter blobWriter) {
+			this.blobWriter = blobWriter;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setAllocationTimeout(final Time allocationTimeout) {
+			this.allocationTimeout = allocationTimeout;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setShuffleMaster(final ShuffleMaster<?> shuffleMaster) {
+			this.shuffleMaster = shuffleMaster;
+			return this;
+		}
+
+		public TestingExecutionGraphBuilder setPartitionTracker(final PartitionTracker partitionTracker) {
+			this.partitionTracker = partitionTracker;
+			return this;
+		}
+
+		public ExecutionGraph build() throws JobException, JobExecutionException {
+			return ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				jobMasterConfig,
+				futureExecutor,
+				ioExecutor,
+				slotProvider,
+				classLoader,
+				checkpointRecoveryFactory,
+				rpcTimeout,
+				restartStrategy,
+				metricGroup,
+				blobWriter,
+				allocationTimeout,
+				TEST_LOGGER,
+				shuffleMaster,
+				partitionTracker
+			);
+		}
+	}
 }


[flink] 02/02: [FLINK-12997][coordination] Release partitions on vertex reset

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f6cfde0cea12f8d6966c48ba3124c53db15c9bc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 26 13:41:51 2019 +0200

    [FLINK-12997][coordination] Release partitions on vertex reset
---
 .../flink/runtime/executiongraph/Execution.java    |  2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |  4 +
 .../executiongraph/ExecutionVertexTest.java        | 86 ++++++++++++++++++++++
 3 files changed, 91 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c033304..4939e61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1335,7 +1335,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	private void stopTrackingAndReleasePartitions() {
+	void stopTrackingAndReleasePartitions() {
 		LOG.info("Discarding the results produced by task execution {}.", attemptId);
 		if (producedPartitions != null && producedPartitions.size() > 0) {
 			final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0d01e25..a12d198 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -591,6 +591,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			final ExecutionState oldState = oldExecution.getState();
 
 			if (oldState.isTerminal()) {
+				if (oldState == FINISHED) {
+					oldExecution.stopTrackingAndReleasePartitions();
+				}
+
 				priorExecutions.add(oldExecution.archive());
 
 				final Execution newExecution = new Execution(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
new file mode 100644
index 0000000..c9f3ef0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for the {@link ExecutionVertex}.
+ */
+public class ExecutionVertexTest extends TestLogger {
+
+	@Test
+	public void testResetForNewExecutionReleasesPartitions() throws Exception {
+		final JobVertex producerJobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+		final JobVertex consumerJobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
+
+		consumerJobVertex.connectNewDataSetAsInput(producerJobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+		final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>();
+		final TestingPartitionTracker partitionTracker = new TestingPartitionTracker();
+		partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
+
+		final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(producerJobVertex, consumerJobVertex)
+			.setPartitionTracker(partitionTracker)
+			.build();
+
+		executionGraph.scheduleForExecution();
+
+		final ExecutionJobVertex producerExecutionJobVertex = executionGraph.getJobVertex(producerJobVertex.getID());
+
+		Execution execution = producerExecutionJobVertex
+			.getTaskVertices()[0]
+			.getCurrentExecutionAttempt();
+
+		assertFalse(releasePartitionsFuture.isDone());
+
+		execution.markFinished();
+
+		assertFalse(releasePartitionsFuture.isDone());
+
+		producerExecutionJobVertex.resetForNewExecution(1L, 1L);
+
+		final IntermediateResultPartitionID intermediateResultPartitionID = producerExecutionJobVertex
+			.getProducedDataSets()[0]
+			.getPartitions()[0]
+			.getPartitionId();
+		final ResultPartitionID resultPartitionID = execution
+			.getResultPartitionDeploymentDescriptor(intermediateResultPartitionID)
+			.get()
+			.getShuffleDescriptor()
+			.getResultPartitionID();
+
+		assertThat(releasePartitionsFuture.get(), contains(resultPartitionID));
+	}
+}