You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/11/22 21:37:54 UTC
[flink] 01/03: [FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f35a76c1e96a4cce51a7a5d335128769f0e196cc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Nov 4 15:12:28 2021 +0100
[FLINK-24775][coordination] move JobStatus-related metrics out of ExecutionGraph
---
.../DefaultExecutionGraphBuilder.java | 9 --
.../flink/runtime/scheduler/JobStatusStore.java | 50 ++++++++++
.../flink/runtime/scheduler/SchedulerBase.java | 20 +++-
.../scheduler/adaptive/AdaptiveScheduler.java | 29 +++---
.../runtime/scheduler/JobStatusStoreTest.java | 66 ++++++++++++
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 111 +++++++++++++++++++--
6 files changed, 251 insertions(+), 34 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index d1c6d6f..ff1af8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -37,9 +37,6 @@ import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategyFactoryLoader;
-import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -315,12 +312,6 @@ public class DefaultExecutionGraphBuilder {
checkpointsCleaner);
}
- // create all the metrics for the Execution Graph
-
- metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
- metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
- metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
-
return executionGraph;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
new file mode 100644
index 0000000..36fccb8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/JobStatusStore.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.JobStatusProvider;
+
+/** Listens for and exposes the current job state and state timestamps. */
+public class JobStatusStore implements JobStatusListener, JobStatusProvider {
+
+ private final long[] stateTimestamps = new long[JobStatus.values().length];
+ private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+ public JobStatusStore(long initializationTimestamp) {
+ stateTimestamps[JobStatus.INITIALIZING.ordinal()] = initializationTimestamp;
+ }
+
+ @Override
+ public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
+ jobStatus = newJobStatus;
+ stateTimestamps[jobStatus.ordinal()] = timestamp;
+ }
+
+ @Override
+ public JobStatus getState() {
+ return jobStatus;
+ }
+
+ @Override
+ public long getStatusTimestamp(JobStatus status) {
+ return stateTimestamps[status.ordinal()];
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index ac9aca7..180921e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -49,8 +51,12 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.JobStatusProvider;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
+import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -579,14 +585,20 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
@Override
public final void startScheduling() {
mainThreadExecutor.assertRunningInMainThread();
- registerJobMetrics();
+ registerJobMetrics(jobManagerJobMetricGroup, executionGraph, this::getNumberOfRestarts);
operatorCoordinatorHandler.startAllOperatorCoordinators();
startSchedulingInternal();
}
- private void registerJobMetrics() {
- jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, this::getNumberOfRestarts);
- jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, this::getNumberOfRestarts);
+ public static void registerJobMetrics(
+ MetricGroup metrics,
+ JobStatusProvider jobStatusProvider,
+ Gauge<Long> numberOfRestarts) {
+ metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(jobStatusProvider));
+ metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider));
+ metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider));
+ metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts);
+ metrics.gauge(MetricNames.FULL_RESTARTS, numberOfRestarts);
}
protected abstract void startSchedulingInternal();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 796b9c4..4333881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
-import org.apache.flink.metrics.Gauge;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -86,6 +84,7 @@ import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.JobStatusStore;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
@@ -116,6 +115,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
@@ -178,7 +178,7 @@ public class AdaptiveScheduler
private final ComponentMainThreadExecutor componentMainThreadExecutor;
private final FatalErrorHandler fatalErrorHandler;
- private final JobStatusListener jobStatusListener;
+ private final Collection<JobStatusListener> jobStatusListeners;
private final SlotAllocator slotAllocator;
@@ -255,7 +255,10 @@ public class AdaptiveScheduler
declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
this.componentMainThreadExecutor = mainThreadExecutor;
- this.jobStatusListener = Preconditions.checkNotNull(jobStatusListener);
+
+ final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
+ this.jobStatusListeners =
+ Arrays.asList(Preconditions.checkNotNull(jobStatusListener), jobStatusStore);
this.scaleUpController = new ReactiveScaleUpController(configuration);
@@ -265,7 +268,8 @@ public class AdaptiveScheduler
this.executionGraphFactory = executionGraphFactory;
- registerMetrics();
+ SchedulerBase.registerJobMetrics(
+ jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts);
}
private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException {
@@ -392,12 +396,6 @@ public class AdaptiveScheduler
"newResourcesAvailable");
}
- private void registerMetrics() {
- final Gauge<Integer> numRestartsMetric = () -> numRestarts;
- jobManagerJobMetricGroup.gauge(MetricNames.NUM_RESTARTS, numRestartsMetric);
- jobManagerJobMetricGroup.gauge(MetricNames.FULL_RESTARTS, numRestartsMetric);
- }
-
@Override
public void startScheduling() {
state.as(Created.class)
@@ -1157,10 +1155,11 @@ public class AdaptiveScheduler
final JobStatus newJobStatus = state.getJobStatus();
if (previousJobStatus != newJobStatus) {
- jobStatusListener.jobStatusChanges(
- jobInformation.getJobID(),
- state.getJobStatus(),
- System.currentTimeMillis());
+ final long timestamp = System.currentTimeMillis();
+ jobStatusListeners.forEach(
+ listener ->
+ listener.jobStatusChanges(
+ jobInformation.getJobID(), newJobStatus, timestamp));
}
return targetStateInstance;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java
new file mode 100644
index 0000000..4713b11
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/JobStatusStoreTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+class JobStatusStoreTest {
+
+ @Test
+ void initialState() {
+ final JobStatusStore store = new JobStatusStore(0);
+ assertThat(store.getState(), is(JobStatus.INITIALIZING));
+ }
+
+ @Test
+ void initialTimestamps() {
+ final JobStatusStore store = new JobStatusStore(967823L);
+
+ for (JobStatus jobStatus : JobStatus.values()) {
+ switch (jobStatus) {
+ case INITIALIZING:
+ assertThat(store.getStatusTimestamp(JobStatus.INITIALIZING), is(967823L));
+ break;
+ default:
+ assertThat(store.getStatusTimestamp(jobStatus), is(0L));
+ }
+ }
+ }
+
+ @Test
+ void getState() {
+ final JobStatusStore store = new JobStatusStore(0L);
+ store.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+
+ assertThat(store.getState(), is(JobStatus.RUNNING));
+ }
+
+ @Test
+ void getStatusTimestamp() {
+ final JobStatusStore store = new JobStatusStore(0L);
+ store.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+
+ assertThat(store.getStatusTimestamp(JobStatus.RUNNING), is(1L));
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 92658ec..458098d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -44,6 +44,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -108,6 +111,7 @@ import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGr
import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.Is.is;
@@ -412,13 +416,13 @@ public class AdaptiveSchedulerTest extends TestLogger {
@Test
public void testNumRestartsMetric() throws Exception {
- final CompletableFuture<Gauge<Integer>> numRestartsMetricFuture = new CompletableFuture<>();
+ final CompletableFuture<Gauge<Long>> numRestartsMetricFuture = new CompletableFuture<>();
final MetricRegistry metricRegistry =
TestingMetricRegistry.builder()
.setRegisterConsumer(
(metric, name, group) -> {
if (MetricNames.NUM_RESTARTS.equals(name)) {
- numRestartsMetricFuture.complete((Gauge<Integer>) metric);
+ numRestartsMetricFuture.complete((Gauge<Long>) metric);
}
})
.build();
@@ -447,7 +451,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
.setDeclarativeSlotPool(declarativeSlotPool)
.build();
- final Gauge<Integer> numRestartsMetric = numRestartsMetricFuture.get();
+ final Gauge<Long> numRestartsMetric = numRestartsMetricFuture.get();
final SubmissionBufferingTaskManagerGateway taskManagerGateway =
new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
@@ -469,7 +473,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
// wait for the first task submission
taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
- assertThat(numRestartsMetric.getValue(), is(0));
+ assertThat(numRestartsMetric.getValue(), is(0L));
singleThreadMainThreadExecutor.execute(
() -> {
@@ -485,7 +489,102 @@ public class AdaptiveSchedulerTest extends TestLogger {
// wait for the second task submissions
taskManagerGateway.waitForSubmissions(PARALLELISM, Duration.ofSeconds(5));
- assertThat(numRestartsMetric.getValue(), is(1));
+ assertThat(numRestartsMetric.getValue(), is(1L));
+ }
+
+ @Test
+ public void testStatusMetrics() throws Exception {
+ final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new CompletableFuture<>();
+ final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new CompletableFuture<>();
+ final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture =
+ new CompletableFuture<>();
+ final MetricRegistry metricRegistry =
+ TestingMetricRegistry.builder()
+ .setRegisterConsumer(
+ (metric, name, group) -> {
+ switch (name) {
+ case UpTimeGauge.METRIC_NAME:
+ upTimeMetricFuture.complete((UpTimeGauge) metric);
+ break;
+ case DownTimeGauge.METRIC_NAME:
+ downTimeMetricFuture.complete((DownTimeGauge) metric);
+ break;
+ case RestartTimeGauge.METRIC_NAME:
+ restartTimeMetricFuture.complete(
+ (RestartTimeGauge) metric);
+ break;
+ }
+ })
+ .build();
+
+ final JobGraph jobGraph = createJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ createDeclarativeSlotPool(jobGraph.getJobID());
+
+ final Configuration configuration = new Configuration();
+ configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
+ configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L));
+
+ final AdaptiveScheduler scheduler =
+ new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor)
+ .setJobMasterConfiguration(configuration)
+ .setJobManagerJobMetricGroup(
+ JobManagerMetricGroup.createJobManagerMetricGroup(
+ metricRegistry, "localhost")
+ .addJob(new JobID(), "jobName"))
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ final UpTimeGauge upTimeGauge = upTimeMetricFuture.get();
+ final DownTimeGauge downTimeGauge = downTimeMetricFuture.get();
+ final RestartTimeGauge restartTimeGauge = restartTimeMetricFuture.get();
+
+ final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+ new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
+
+ taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ scheduler.startScheduling();
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+ taskManagerGateway);
+ });
+
+ // wait for the first task submission
+ taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+
+ // sleep a bit to ensure uptime is > 0
+ Thread.sleep(10L);
+
+ assertThat(upTimeGauge.getValue(), greaterThan(0L));
+ assertThat(downTimeGauge.getValue(), is(0L));
+ assertThat(restartTimeGauge.getValue(), is(0L));
+
+ singleThreadMainThreadExecutor.execute(
+ () -> {
+ // offer more slots, which will cause a restart in order to scale up
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+ taskManagerGateway);
+ });
+
+ // wait for the second task submissions
+ taskManagerGateway.waitForSubmissions(2, Duration.ofSeconds(5));
+
+ // sleep a bit to ensure uptime is > 0
+ Thread.sleep(10L);
+
+ assertThat(upTimeGauge.getValue(), greaterThan(0L));
+ assertThat(downTimeGauge.getValue(), is(0L));
+ assertThat(restartTimeGauge.getValue(), greaterThan(0L));
}
// ---------------------------------------------------------------------------------------------
@@ -1127,7 +1226,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
@Override
public JobStatus getJobStatus() {
- return null;
+ return JobStatus.RUNNING;
}
@Override