You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/23 15:23:40 UTC

[flink] 04/04: [FLINK-12432][runtime] Add SchedulerNG stub implementation

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed31f4c76aa47ff9ded9c46927e4c1e97510088d
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed May 15 13:57:15 2019 +0200

    [FLINK-12432][runtime] Add SchedulerNG stub implementation
    
    Add new SchedulerNG stub implementation, which will represents the future
    default scheduler.
    
    Add feature toggle to switch between existing scheduler and stub
    implementation.
    
    Add ThrowingRestartStrategy to validate that in new scheduling code paths, the
    legacy restart strategies are not used.
    
    This closes #8452.
---
 .../flink/configuration/JobManagerOptions.java     | 13 ++++
 .../dispatcher/DefaultJobManagerRunnerFactory.java |  3 +-
 .../dispatcher/SchedulerNGFactoryFactory.java      | 52 +++++++++++++++
 .../restart/ThrowingRestartStrategy.java           | 53 +++++++++++++++
 .../flink/runtime/scheduler/DefaultScheduler.java  | 78 ++++++++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerFactory.java | 73 ++++++++++++++++++++
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  | 77 +++++++++++++++++++++
 .../ThrowingRestartStrategyFactoryTest.java        | 67 +++++++++++++++++++
 8 files changed, 414 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 69f445f..a1d55b1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -160,6 +160,19 @@ public class JobManagerOptions {
 			// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
 			.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
 			.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
+	/**
+	 * Config parameter determining the scheduler implementation.
+	 */
+	@Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
+	public static final ConfigOption<String> SCHEDULER =
+		key("jobmanager.scheduler")
+			.defaultValue("legacy")
+			.withDescription(Description.builder()
+				.text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
+				.list(
+					text("'legacy': legacy scheduler"),
+					text("'ng': new generation scheduler"))
+				.build());
 
 	// ---------------------------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index b7f80b6..c0707c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -57,8 +57,7 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 		final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
 		final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
-		final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
-			jobManagerServices.getRestartStrategyFactory());
+		final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, jobManagerServices.getRestartStrategyFactory());
 
 		final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
 			jobMasterConfiguration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
new file mode 100644
index 0000000..a757bae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+
+final class SchedulerNGFactoryFactory {
+
+	private SchedulerNGFactoryFactory() {}
+
+	static SchedulerNGFactory createSchedulerNGFactory(
+			final Configuration configuration,
+			final RestartStrategyFactory restartStrategyFactory) {
+
+		final String schedulerName = configuration.getString(JobManagerOptions.SCHEDULER);
+		switch (schedulerName) {
+			case "legacy":
+				return new LegacySchedulerFactory(restartStrategyFactory);
+
+			case "ng":
+				return new DefaultSchedulerFactory();
+
+			default:
+				throw new IllegalArgumentException(String.format(
+					"Illegal value [%s] for config option [%s]",
+					schedulerName,
+					JobManagerOptions.SCHEDULER.key()));
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
new file mode 100644
index 0000000..e7355df
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+
+/**
+ * A restart strategy that validates that it is not in use by throwing {@link IllegalStateException}
+ * on any method call.
+ */
+public class ThrowingRestartStrategy implements RestartStrategy {
+
+	@Override
+	public boolean canRestart() {
+		throw new IllegalStateException("Unexpected canRestart() call");
+	}
+
+	@Override
+	public void restart(final RestartCallback restarter, final ScheduledExecutor executor) {
+		throw new IllegalStateException("Unexpected restart() call");
+	}
+
+	/**
+	 * Factory for {@link ThrowingRestartStrategy}.
+	 */
+	public static class ThrowingRestartStrategyFactory extends RestartStrategyFactory {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public RestartStrategy createRestartStrategy() {
+			return new ThrowingRestartStrategy();
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
new file mode 100644
index 0000000..427734a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Stub implementation of the future default scheduler.
+ */
+public class DefaultScheduler extends LegacyScheduler {
+
+	public DefaultScheduler(
+			final Logger log,
+			final JobGraph jobGraph,
+			final BackPressureStatsTracker backPressureStatsTracker,
+			final Executor ioExecutor,
+			final Configuration jobMasterConfiguration,
+			final SlotProvider slotProvider,
+			final ScheduledExecutorService futureExecutor,
+			final ClassLoader userCodeLoader,
+			final CheckpointRecoveryFactory checkpointRecoveryFactory,
+			final Time rpcTimeout,
+			final BlobWriter blobWriter,
+			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+			final Time slotRequestTimeout) throws Exception {
+
+		super(
+			log,
+			jobGraph,
+			backPressureStatsTracker,
+			ioExecutor,
+			jobMasterConfiguration,
+			slotProvider,
+			futureExecutor,
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcTimeout,
+			new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
+			blobWriter,
+			jobManagerJobMetricGroup,
+			slotRequestTimeout);
+	}
+
+	@Override
+	public void startScheduling() {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
new file mode 100644
index 0000000..71f8802
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Factory for {@link DefaultScheduler}.
+ */
+public class DefaultSchedulerFactory implements SchedulerNGFactory {
+
+	@Override
+	public SchedulerNG createInstance(
+			final Logger log,
+			final JobGraph jobGraph,
+			final BackPressureStatsTracker backPressureStatsTracker,
+			final Executor ioExecutor,
+			final Configuration jobMasterConfiguration,
+			final SlotProvider slotProvider,
+			final ScheduledExecutorService futureExecutor,
+			final ClassLoader userCodeLoader,
+			final CheckpointRecoveryFactory checkpointRecoveryFactory,
+			final Time rpcTimeout,
+			final BlobWriter blobWriter,
+			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+			final Time slotRequestTimeout) throws Exception {
+
+		return new DefaultScheduler(
+			log,
+			jobGraph,
+			backPressureStatsTracker,
+			ioExecutor,
+			jobMasterConfiguration,
+			slotProvider,
+			futureExecutor,
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcTimeout,
+			blobWriter,
+			jobManagerJobMetricGroup,
+			slotRequestTimeout);
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
new file mode 100644
index 0000000..cfaf0f0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SchedulerNGFactory}.
+ */
+public class SchedulerNGFactoryFactoryTest extends TestLogger {
+
+	private static final NoRestartStrategy.NoRestartStrategyFactory TEST_RESTART_STRATEGY_FACTORY = new NoRestartStrategy.NoRestartStrategyFactory();
+
+	@Test
+	public void createLegacySchedulerFactoryByDefault() {
+		final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(new Configuration());
+		assertThat(schedulerNGFactory, is(instanceOf(LegacySchedulerFactory.class)));
+	}
+
+	@Test
+	public void createSchedulerNGFactoryIfConfigured() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULER, "ng");
+
+		final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(configuration);
+
+		assertThat(schedulerNGFactory, is(instanceOf(DefaultSchedulerFactory.class)));
+	}
+
+	@Test
+	public void throwsExceptionIfSchedulerNameIsInvalid() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULER, "invalid-scheduler-name");
+
+		try {
+			createSchedulerNGFactory(configuration);
+		} catch (IllegalArgumentException e) {
+			assertThat(e.getMessage(), containsString("Illegal value [invalid-scheduler-name]"));
+		}
+	}
+
+	private static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
+		return SchedulerNGFactoryFactory.createSchedulerNGFactory(
+			configuration,
+			TEST_RESTART_STRATEGY_FACTORY);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java
new file mode 100644
index 0000000..dd4f399
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ThrowingRestartStrategy}.
+ */
+public class ThrowingRestartStrategyFactoryTest extends TestLogger {
+
+	private RestartStrategy restartStrategy;
+
+	@Before
+	public void setUp() {
+		restartStrategy = new ThrowingRestartStrategy();
+	}
+
+	@Test
+	public void restartShouldThrowException() {
+		final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
+
+		try {
+			restartStrategy.restart(new NoOpRestarter(), manuallyTriggeredScheduledExecutor);
+			fail("Expected exception not thrown");
+		} catch (IllegalStateException e) {
+			assertThat(e.getMessage(), is(equalTo("Unexpected restart() call")));
+			assertThat(manuallyTriggeredScheduledExecutor.numQueuedRunnables(), is(equalTo(0)));
+		}
+	}
+
+	@Test
+	public void canRestartShouldThrowException() {
+		try {
+			restartStrategy.canRestart();
+			fail("Expected exception not thrown");
+		} catch (IllegalStateException e) {
+			assertThat(e.getMessage(), is(equalTo("Unexpected canRestart() call")));
+		}
+	}
+}