You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/05 14:03:05 UTC

[flink] branch master updated (8176d58 -> 2f80cfd)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8176d58  [FLINK-24740][testinfrastructure] Update testcontainers dependency to the latest version
     new d4a0251  [FLINK-24749] Add CachingSupplier
     new 2f80cfd  [FLINK-24749][coordination] Reuse CheckpointStatsTracker

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/util/function/CachingSupplier.java       |  27 ++++--
 .../flink/util/function/CachingSupplierTest.java   |  24 +++--
 .../DefaultExecutionGraphBuilder.java              |  13 +--
 .../scheduler/DefaultExecutionGraphFactory.java    |  15 ++-
 .../TestingDefaultExecutionGraphBuilder.java       |   4 +-
 .../adaptive/AdaptiveSchedulerClusterITCase.java   | 104 +++++++++++++++++++++
 6 files changed, 157 insertions(+), 30 deletions(-)
 copy flink-python/src/main/java/org/apache/flink/api/common/python/pickle/ByteArrayConstructor.java => flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java (56%)
 copy flink-streaming-scala/src/test/java/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactoryTest.java => flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java (57%)

[flink] 02/02: [FLINK-24749][coordination] Reuse CheckpointStatsTracker

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2f80cfde34749e882f2171bb9ef070feb1624847
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Nov 2 16:58:53 2021 +0100

    [FLINK-24749][coordination] Reuse CheckpointStatsTracker
---
 .../DefaultExecutionGraphBuilder.java              |  13 +--
 .../scheduler/DefaultExecutionGraphFactory.java    |  15 ++-
 .../TestingDefaultExecutionGraphBuilder.java       |   4 +-
 .../adaptive/AdaptiveSchedulerClusterITCase.java   | 104 +++++++++++++++++++++
 4 files changed, 125 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 4a61dd3..d1c6d6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -64,6 +63,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -93,7 +93,8 @@ public class DefaultExecutionGraphBuilder {
             ExecutionStateUpdateListener executionStateUpdateListener,
             long initializationTimestamp,
             VertexAttemptNumberStore vertexAttemptNumberStore,
-            VertexParallelismStore vertexParallelismStore)
+            VertexParallelismStore vertexParallelismStore,
+            Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory)
             throws JobExecutionException, JobException {
 
         checkNotNull(jobGraph, "job graph cannot be null");
@@ -204,12 +205,6 @@ public class DefaultExecutionGraphBuilder {
         if (isCheckpointingEnabled(jobGraph)) {
             JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
 
-            // Maximum number of remembered checkpoints
-            int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
-
-            CheckpointStatsTracker checkpointStatsTracker =
-                    new CheckpointStatsTracker(historySize, metrics);
-
             // load the state backend from the application settings
             final StateBackend applicationConfiguredBackend;
             final SerializedValue<StateBackend> serializedAppConfigured =
@@ -316,7 +311,7 @@ public class DefaultExecutionGraphBuilder {
                     completedCheckpointStore,
                     rootBackend,
                     rootStorage,
-                    checkpointStatsTracker,
+                    checkpointStatsTrackerFactory.get(),
                     checkpointsCleaner);
         }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index f1721c7..0f49909 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
@@ -38,12 +40,14 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
 import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.function.CachingSupplier;
 
 import org.slf4j.Logger;
 
 import java.util.HashSet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
 
 /** Default {@link ExecutionGraphFactory} implementation. */
 public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
@@ -58,6 +62,7 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
     private final BlobWriter blobWriter;
     private final ShuffleMaster<?> shuffleMaster;
     private final JobMasterPartitionTracker jobMasterPartitionTracker;
+    private final Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory;
 
     public DefaultExecutionGraphFactory(
             Configuration configuration,
@@ -80,6 +85,13 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
         this.blobWriter = blobWriter;
         this.shuffleMaster = shuffleMaster;
         this.jobMasterPartitionTracker = jobMasterPartitionTracker;
+        this.checkpointStatsTrackerFactory =
+                new CachingSupplier<>(
+                        () ->
+                                new CheckpointStatsTracker(
+                                        configuration.getInteger(
+                                                WebOptions.CHECKPOINTS_HISTORY_SIZE),
+                                        jobManagerJobMetricGroup));
     }
 
     @Override
@@ -124,7 +136,8 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                         executionStateUpdateListener,
                         initializationTimestamp,
                         vertexAttemptNumberStore,
-                        vertexParallelismStore);
+                        vertexParallelismStore,
+                        checkpointStatsTrackerFactory);
 
         final CheckpointCoordinator checkpointCoordinator =
                 newExecutionGraph.getCheckpointCoordinator();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index bde0cbb..edf85c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
@@ -185,6 +186,7 @@ public class TestingDefaultExecutionGraphBuilder {
                 System.currentTimeMillis(),
                 new DefaultVertexAttemptNumberStore(),
                 Optional.ofNullable(vertexParallelismStore)
-                        .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)));
+                        .orElseGet(() -> SchedulerBase.computeVertexParallelismStore(jobGraph)),
+                () -> new CheckpointStatsTracker(0, new UnregisteredMetricsGroup()));
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
index 9865395..4d1d7e44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.java
@@ -25,11 +25,20 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
@@ -44,8 +53,14 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -76,6 +91,8 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger {
         configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
         configuration.set(
                 JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(100L));
+        // required for #testCheckpointStatsPersistedAcrossRescale
+        configuration.set(WebOptions.CHECKPOINTS_HISTORY_SIZE, Integer.MAX_VALUE);
 
         return configuration;
     }
@@ -141,6 +158,93 @@ public class AdaptiveSchedulerClusterITCase extends TestLogger {
         assertTrue(jobResultFuture.join().isSuccess());
     }
 
+    @Test
+    public void testCheckpointStatsPersistedAcrossRescale() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+
+        JobVertex jobVertex = new JobVertex("jobVertex", JOB_VERTEX_ID);
+        jobVertex.setInvokableClass(CheckpointingNoOpInvokable.class);
+        jobVertex.setParallelism(PARALLELISM);
+
+        final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
+        jobGraph.setSnapshotSettings(
+                new JobCheckpointingSettings(
+                        CheckpointCoordinatorConfiguration.builder()
+                                .setCheckpointInterval(100)
+                                .setCheckpointTimeout(1000)
+                                .build(),
+                        null));
+
+        miniCluster.submitJob(jobGraph).join();
+
+        // wait until some checkpoints have been completed
+        CommonTestUtils.waitUntilCondition(
+                () ->
+                        miniCluster
+                                .getExecutionGraph(jobGraph.getJobID())
+                                .thenApply(
+                                        eg ->
+                                                eg.getCheckpointStatsSnapshot()
+                                                                .getCounts()
+                                                                .getNumberOfCompletedCheckpoints()
+                                                        > 0)
+                                .get(),
+                Deadline.fromNow(Duration.ofHours(1)));
+
+        miniCluster.terminateTaskManager(0);
+
+        waitUntilParallelismForVertexReached(
+                jobGraph.getJobID(),
+                JOB_VERTEX_ID,
+                NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS - 1));
+
+        // check that the very first checkpoint is still accessible
+        final List<AbstractCheckpointStats> checkpointHistory =
+                miniCluster
+                        .getExecutionGraph(jobGraph.getJobID())
+                        .thenApply(
+                                eg -> eg.getCheckpointStatsSnapshot().getHistory().getCheckpoints())
+                        .get();
+        assertThat(checkpointHistory.get(checkpointHistory.size() - 1).getCheckpointId(), is(1L));
+    }
+
+    /** An invokable that doesn't do anything interesting, but does support checkpointing. */
+    public static class CheckpointingNoOpInvokable extends AbstractInvokable {
+
+        private static final long CANCEL_SIGNAL = -2L;
+        private final BlockingQueue<Long> checkpointsToConfirm = new ArrayBlockingQueue<>(1);
+
+        public CheckpointingNoOpInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            long signal = checkpointsToConfirm.take();
+            while (signal != CANCEL_SIGNAL) {
+                getEnvironment().acknowledgeCheckpoint(signal, new CheckpointMetrics());
+                signal = checkpointsToConfirm.take();
+            }
+        }
+
+        @Override
+        public void cancel() throws Exception {
+            checkpointsToConfirm.add(CANCEL_SIGNAL);
+        }
+
+        @Override
+        public CompletableFuture<Boolean> triggerCheckpointAsync(
+                CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
+            checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
     private JobGraph createBlockingJobGraph(int parallelism) throws IOException {
         final JobVertex blockingOperator = new JobVertex("Blocking operator", JOB_VERTEX_ID);
 

[flink] 01/02: [FLINK-24749] Add CachingSupplier

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4a0251a08ae1d5f931ab24dc898faea16e2a9a2
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Nov 3 13:59:26 2021 +0100

    [FLINK-24749] Add CachingSupplier
---
 .../flink/util/function/CachingSupplier.java       | 42 ++++++++++++++++++++++
 .../flink/util/function/CachingSupplierTest.java   | 39 ++++++++++++++++++++
 2 files changed, 81 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java b/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java
new file mode 100644
index 0000000..d1bfce1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/function/CachingSupplier.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.function.Supplier;
+
+/** A {@link Supplier} that returns a single, lazily instantiated, value. */
+@NotThreadSafe
+public class CachingSupplier<T> implements Supplier<T> {
+    private final Supplier<T> backingSupplier;
+    private @Nullable T cachedValue;
+
+    public CachingSupplier(Supplier<T> backingSupplier) {
+        this.backingSupplier = backingSupplier;
+    }
+
+    @Override
+    public T get() {
+        if (cachedValue == null) {
+            cachedValue = backingSupplier.get();
+        }
+        return cachedValue;
+    }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java b/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java
new file mode 100644
index 0000000..8826887
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/function/CachingSupplierTest.java
@@ -0,0 +1,39 @@
+package org.apache.flink.util.function;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+class CachingSupplierTest {
+
+    @Test
+    void testCaching() {
+        final AtomicInteger instantiationCounts = new AtomicInteger();
+        final Supplier<Integer> backingSupplier = () -> instantiationCounts.incrementAndGet();
+        final CachingSupplier<Integer> cachingSupplier = new CachingSupplier<>(backingSupplier);
+
+        assertThat(cachingSupplier.get(), is(cachingSupplier.get()));
+        assertThat(instantiationCounts.get(), is(1));
+    }
+}