You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/22 21:37:53 UTC

[flink] branch master updated (81dcecf -> 60f2f3c)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 81dcecf  [FLINK-24113][tests] Reduce duplication in ApplicationDispatcherBootstrapTest
     new f35a76c  [FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph
     new fc37f0f  [hotfix] Remove MetricGroup parameter from EG builder
     new 60f2f3c  [FLINK-24903][coordination] Harden AdaptiveSchedulerTest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../DefaultExecutionGraphBuilder.java              |  11 --
 .../scheduler/DefaultExecutionGraphFactory.java    |   1 -
 .../flink/runtime/scheduler/JobStatusStore.java    |  50 ++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  20 ++-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  29 +++--
 .../TestingDefaultExecutionGraphBuilder.java       |   8 --
 .../runtime/scheduler/JobStatusStoreTest.java      |  66 ++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 137 +++++++++++++++++++--
 8 files changed, 270 insertions(+), 52 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java

[flink] 01/03: [FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f35a76c1e96a4cce51a7a5d335128769f0e196cc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Nov 4 15:12:28 2021 +0100

    [FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph
---
 .../DefaultExecutionGraphBuilder.java              |   9 --
 .../flink/runtime/scheduler/JobStatusStore.java    |  50 ++++++++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  20 +++-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  29 +++---
 .../runtime/scheduler/JobStatusStoreTest.java      |  66 ++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 111 +++++++++++++++++++--
 6 files changed, 251 insertions(+), 34 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index d1c6d6f..ff1af8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -37,9 +37,6 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategyFactoryLoader;
-import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -315,12 +312,6 @@ public class DefaultExecutionGraphBuilder {
                     checkpointsCleaner);
         }
 
-        // create all the metrics for the Execution Graph
-
-        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
-        metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
-        metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
-
         return executionGraph;
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
new file mode 100644
index 0000000..36fccb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.JobStatusProvider;
+
+/** Listens for and exposes the current job state and state timestamps. */
+public class JobStatusStore implements JobStatusListener, JobStatusProvider {
+
+    private final long[] stateTimestamps = new long[JobStatus.values().length];
+    private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+    public JobStatusStore(long initializationTimestamp) {
+        stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+    }
+
+    @Override
+    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
+        jobStatus = newJobStatus;
+        stateTimestamps[jobStatus.ordinal()] = timestamp;
+    }
+
+    @Override
+    public JobStatus getState() {
+        return jobStatus;
+    }
+
+    @Override
+    public long getStatusTimestamp(JobStatus status) {
+        return stateTimestamps[status.ordinal()];
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ac9aca7..180921e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -49,8 +51,12 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.JobStatusProvider;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -579,14 +585,20 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
     @Override
     public final void startScheduling() {
         mainThreadExecutor.assertRunningInMainThread();
-        registerJobMetrics();
+        registerJobMetrics(jobManagerJobMetricGroup, executionGraph, this::getNumberOfRestarts);
         operatorCoordinatorHandler.startAllOperatorCoordinators();
         startSchedulingInternal();
     }
 
-    private void registerJobMetrics() {
-        jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, this::getNumberOfRestarts);
-        jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, this::getNumberOfRestarts);
+    public static void registerJobMetrics(
+            MetricGroup metrics,
+            JobStatusProvider jobStatusProvider,
+            Gauge<Long> numberOfRestarts) {
+        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(jobStatusProvider));
+        metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider));
+        metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider));
+        metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts);
+        metrics.gauge(MetricNames.FULL_RESTARTS, numberOfRestarts);
     }
 
     protected abstract void startSchedulingInternal();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 796b9c4..4333881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -86,6 +84,7 @@ import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
 import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
 import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.JobStatusStore;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
@@ -116,6 +115,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Optional;
@@ -178,7 +178,7 @@ public class AdaptiveScheduler
     private final ComponentMainThreadExecutor componentMainThreadExecutor;
     private final FatalErrorHandler fatalErrorHandler;
 
-    private final JobStatusListener jobStatusListener;
+    private final Collection<JobStatusListener> jobStatusListeners;
 
     private final SlotAllocator slotAllocator;
 
@@ -255,7 +255,10 @@ public class AdaptiveScheduler
         declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
 
         this.componentMainThreadExecutor = mainThreadExecutor;
-        this.jobStatusListener = Preconditions.checkNotNull(jobStatusListener);
+
+        final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
+        this.jobStatusListeners =
+                Arrays.asList(Preconditions.checkNotNull(jobStatusListener), jobStatusStore);
 
         this.scaleUpController = new ReactiveScaleUpController(configuration);
 
@@ -265,7 +268,8 @@ public class AdaptiveScheduler
 
         this.executionGraphFactory = executionGraphFactory;
 
-        registerMetrics();
+        SchedulerBase.registerJobMetrics(
+                jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts);
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException {
@@ -392,12 +396,6 @@ public class AdaptiveScheduler
                 "newResourcesAvailable");
     }
 
-    private void registerMetrics() {
-        final Gauge<Integer> numRestartsMetric = () -> numRestarts;
-        jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, numRestartsMetric);
-        jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, numRestartsMetric);
-    }
-
     @Override
     public void startScheduling() {
         state.as(Created.class)
@@ -1157,10 +1155,11 @@ public class AdaptiveScheduler
             final JobStatus newJobStatus = state.getJobStatus();
 
             if (previousJobStatus != newJobStatus) {
-                jobStatusListener.jobStatusChanges(
-                        jobInformation.getJobID(),
-                        state.getJobStatus(),
-                        System.currentTimeMillis());
+                final long timestamp = System.currentTimeMillis();
+                jobStatusListeners.forEach(
+                        listener ->
+                                listener.jobStatusChanges(
+                                        jobInformation.getJobID(), newJobStatus, timestamp));
             }
 
             return targetStateInstance;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java
new file mode 100644
index 0000000..4713b11
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+class JobStatusStoreTest {
+
+    @Test
+    void initialState() {
+        final JobStatusStore store = new JobStatusStore(0);
+        assertThat(store.getState(), is(JobStatus.INITIALIZING));
+    }
+
+    @Test
+    void initialTimestamps() {
+        final JobStatusStore store = new JobStatusStore(967823L);
+
+        for (JobStatus jobStatus : JobStatus.values()) {
+            switch (jobStatus) {
+                case INITIALIZING:
+                    assertThat(store.getStatusTimestamp(JobStatus.INITIALIZING), is(967823L));
+                    break;
+                default:
+                    assertThat(store.getStatusTimestamp(jobStatus), is(0L));
+            }
+        }
+    }
+
+    @Test
+    void getState() {
+        final JobStatusStore store = new JobStatusStore(0L);
+        store.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+
+        assertThat(store.getState(), is(JobStatus.RUNNING));
+    }
+
+    @Test
+    void getStatusTimestamp() {
+        final JobStatusStore store = new JobStatusStore(0L);
+        store.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+
+        assertThat(store.getStatusTimestamp(JobStatus.RUNNING), is(1L));
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 92658ec..458098d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -44,6 +44,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -108,6 +111,7 @@ import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGr
 import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
 import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
@@ -412,13 +416,13 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
     @Test
     public void testNumRestartsMetric() throws Exception {
-        final CompletableFuture<Gauge<Integer>> numRestartsMetricFuture = new CompletableFuture<>();
+        final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new CompletableFuture<>();
         final MetricRegistry metricRegistry =
                 TestingMetricRegistry.builder()
                         .setRegisterConsumer(
                                 (metric, name, group) -> {
                                     if (MetricNames.NUM_RESTARTS.equals(name)) {
-                                        numRestartsMetricFuture.complete((Gauge<Integer>) metric);
+                                        numRestartsMetricFuture.complete((Gauge<Long>) metric);
                                     }
                                 })
                         .build();
@@ -447,7 +451,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
-        final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
+        final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get();
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
@@ -469,7 +473,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
         // wait for the first task submission
         taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
 
-        assertThat(numRestartsMetric.getValue(), is(0));
+        assertThat(numRestartsMetric.getValue(), is(0L));
 
         singleThreadMainThreadExecutor.execute(
                 () -> {
@@ -485,7 +489,102 @@ public class AdaptiveSchedulerTest extends TestLogger {
         // wait for the second task submissions
         taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5));
 
-        assertThat(numRestartsMetric.getValue(), is(1));
+        assertThat(numRestartsMetric.getValue(), is(1L));
+    }
+
+    @Test
+    public void testStatusMetrics() throws Exception {
+        final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new CompletableFuture<>();
+        final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new CompletableFuture<>();
+        final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture =
+                new CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    switch (name) {
+                                        case UpTimeGauge.METRIC_NAME:
+                                            upTimeMetricFuture.complete((UpTimeGauge) metric);
+                                            break;
+                                        case DownTimeGauge.METRIC_NAME:
+                                            downTimeMetricFuture.complete((DownTimeGauge) metric);
+                                            break;
+                                        case RestartTimeGauge.METRIC_NAME:
+                                            restartTimeMetricFuture.complete(
+                                                    (RestartTimeGauge) metric);
+                                            break;
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                JobManagerMetricGroup.createJobManagerMetricGroup(
+                                                metricRegistry, "localhost")
+                                        .addJob(new JobID(), "jobName"))
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        final UpTimeGauge upTimeGauge = upTimeMetricFuture.get();
+        final DownTimeGauge downTimeGauge = downTimeMetricFuture.get();
+        final RestartTimeGauge restartTimeGauge = restartTimeMetricFuture.get();
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
+
+        taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the first task submission
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        assertThat(upTimeGauge.getValue(), greaterThan(0L));
+        assertThat(downTimeGauge.getValue(), is(0L));
+        assertThat(restartTimeGauge.getValue(), is(0L));
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // offer more slots, which will cause a restart in order to scale up
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the second task submissions
+        taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        assertThat(upTimeGauge.getValue(), greaterThan(0L));
+        assertThat(downTimeGauge.getValue(), is(0L));
+        assertThat(restartTimeGauge.getValue(), greaterThan(0L));
     }
 
     // ---------------------------------------------------------------------------------------------
@@ -1127,7 +1226,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         @Override
         public JobStatus getJobStatus() {
-            return null;
+            return JobStatus.RUNNING;
         }
 
         @Override

[flink] 02/03: [hotfix] Remove MetricGroup parameter from EG builder

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc37f0f2d9108d618be5beed57a073d6205104a3
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Nov 8 15:14:59 2021 +0100

    [hotfix] Remove MetricGroup parameter from EG builder
---
 .../runtime/executiongraph/DefaultExecutionGraphBuilder.java      | 2 --
 .../flink/runtime/scheduler/DefaultExecutionGraphFactory.java     | 1 -
 .../executiongraph/TestingDefaultExecutionGraphBuilder.java       | 8 --------
 3 files changed, 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index ff1af8f..35c5837 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -80,7 +79,6 @@ public class DefaultExecutionGraphBuilder {
             CheckpointsCleaner checkpointsCleaner,
             CheckpointIDCounter checkpointIdCounter,
             Time rpcTimeout,
-            MetricGroup metrics,
             BlobWriter blobWriter,
             Logger log,
             ShuffleMaster<?> shuffleMaster,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 0f49909..8284572 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -126,7 +126,6 @@ public class DefaultExecutionGraphFactory implements ExecutionGraphFactory {
                         checkpointsCleaner,
                         checkpointIdCounter,
                         rpcTimeout,
-                        jobManagerJobMetricGroup,
                         blobWriter,
                         log,
                         shuffleMaster,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index edf85c4..5842208 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
@@ -70,7 +69,6 @@ public class TestingDefaultExecutionGraphBuilder {
     private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
     private Configuration jobMasterConfig = new Configuration();
     private JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
-    private MetricGroup metricGroup = new UnregisteredMetricsGroup();
     private CompletedCheckpointStore completedCheckpointStore =
             new StandaloneCompletedCheckpointStore(1);
     private CheckpointIDCounter checkpointIdCounter = new StandaloneCheckpointIDCounter();
@@ -128,11 +126,6 @@ public class TestingDefaultExecutionGraphBuilder {
         return this;
     }
 
-    public TestingDefaultExecutionGraphBuilder setMetricGroup(MetricGroup metricGroup) {
-        this.metricGroup = metricGroup;
-        return this;
-    }
-
     public TestingDefaultExecutionGraphBuilder setCompletedCheckpointStore(
             CompletedCheckpointStore completedCheckpointStore) {
         this.completedCheckpointStore = completedCheckpointStore;
@@ -174,7 +167,6 @@ public class TestingDefaultExecutionGraphBuilder {
                 new CheckpointsCleaner(),
                 checkpointIdCounter,
                 rpcTimeout,
-                metricGroup,
                 blobWriter,
                 LOG,
                 shuffleMaster,

[flink] 03/03: [FLINK-24903][coordination] Harden AdaptiveSchedulerTest

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60f2f3c045df34655e2b51d4b248039eea7ee883
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Nov 15 16:42:59 2021 +0100

    [FLINK-24903][coordination] Harden AdaptiveSchedulerTest
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 458098d..5596804 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -92,7 +92,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -112,7 +111,6 @@ import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlot
 import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
@@ -744,13 +742,26 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L));
 
-        final Collection<JobStatus> jobStatusNotifications = new ArrayList<>();
+        final CompletableFuture<Void> jobRunningNotification = new CompletableFuture<>();
+        final CompletableFuture<Void> jobFinishedNotification = new CompletableFuture<>();
+        final CompletableFuture<JobStatus> unexpectedJobStatusNotification =
+                new CompletableFuture<>();
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
                         .setJobMasterConfiguration(configuration)
                         .setJobStatusListener(
-                                (jobId, newJobStatus, timestamp) ->
-                                        jobStatusNotifications.add(newJobStatus))
+                                (jobId, newJobStatus, timestamp) -> {
+                                    switch (newJobStatus) {
+                                        case RUNNING:
+                                            jobRunningNotification.complete(null);
+                                            break;
+                                        case FINISHED:
+                                            jobFinishedNotification.complete(null);
+                                            break;
+                                        default:
+                                            unexpectedJobStatusNotification.complete(newJobStatus);
+                                    }
+                                })
                         .setDeclarativeSlotPool(declarativeSlotPool)
                         .build();
 
@@ -778,9 +789,10 @@ public class AdaptiveSchedulerTest extends TestLogger {
                                 new TaskExecutionState(
                                         submittedTask.getExecutionAttemptId(),
                                         ExecutionState.FINISHED)));
-        scheduler.getJobTerminationFuture().get();
 
-        assertThat(jobStatusNotifications, hasItems(JobStatus.RUNNING, JobStatus.FINISHED));
+        jobRunningNotification.get();
+        jobFinishedNotification.get();
+        assertThat(unexpectedJobStatusNotification.isDone(), is(false));
     }
 
     @Test