You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/23 15:23:36 UTC

[flink] branch master updated (9f19345 -> ed31f4c)

This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9f19345  [hotfix][connector-hive] Fix Hive type mapping to Table API type information
     new e10e9ef  [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval
     new aa9b899  [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory
     new f9f43a5  [hotfix][runtime] Move scheduling-related classes to new package
     new ed31f4c  [FLINK-12432][runtime] Add SchedulerNG stub implementation

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/job_manager_configuration.html       |  5 --
 .../flink/configuration/JobManagerOptions.java     | 25 +++----
 .../dispatcher/DefaultJobManagerRunnerFactory.java |  6 +-
 .../dispatcher/SchedulerNGFactoryFactory.java      | 52 +++++++++++++++
 .../restart/RestartStrategyFactory.java            |  9 ++-
 .../restart/ThrowingRestartStrategy.java           | 53 +++++++++++++++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  2 +
 .../factories/DefaultJobMasterServiceFactory.java  |  2 +-
 .../DefaultScheduler.java}                         | 28 ++++----
 .../DefaultSchedulerFactory.java}                  | 19 ++----
 .../{jobmaster => scheduler}/LegacyScheduler.java  |  3 +-
 .../LegacySchedulerFactory.java                    |  2 +-
 .../{jobmaster => scheduler}/SchedulerNG.java      |  3 +-
 .../SchedulerNGFactory.java                        |  2 +-
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  | 77 ++++++++++++++++++++++
 .../ThrowingRestartStrategyFactoryTest.java        | 67 +++++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java     |  2 +
 17 files changed, 299 insertions(+), 58 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/LegacySchedulerFactory.java => scheduler/DefaultScheduler.java} (78%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster/LegacySchedulerFactory.java => scheduler/DefaultSchedulerFactory.java} (79%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/LegacyScheduler.java (99%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/LegacySchedulerFactory.java (98%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNG.java (98%)
 rename flink-runtime/src/main/java/org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNGFactory.java (97%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java


[flink] 02/04: [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa9b89919ef10570920fdc6b22cabdf244b70508
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed May 15 11:41:38 2019 +0200

    [hotfix][runtime] Fix checkstyle violations in RestartStrategyFactory
---
 .../runtime/executiongraph/restart/RestartStrategyFactory.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index f15ee0b..b5361ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -22,14 +22,19 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
 
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import scala.concurrent.duration.Duration;
+
+/**
+ * Factory for {@link RestartStrategy}.
+ */
 public abstract class RestartStrategyFactory implements Serializable {
 	private static final long serialVersionUID = 7320252552640522191L;
 
@@ -37,7 +42,7 @@ public abstract class RestartStrategyFactory implements Serializable {
 	private static final String CREATE_METHOD = "createFactory";
 
 	/**
-	 * Factory method to create a restart strategy
+	 * Factory method to create a restart strategy.
 	 * @return The created restart strategy
 	 */
 	public abstract RestartStrategy createRestartStrategy();


[flink] 01/04: [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e10e9efd22ef76605f9c3a33707221aecccdff0a
Author: Gary Yao <ga...@apache.org>
AuthorDate: Tue May 14 16:43:42 2019 +0200

    [hotfix][core] Delete unused config option jobmanager.resourcemanager.reconnect-interval
---
 docs/_includes/generated/job_manager_configuration.html      |  5 -----
 .../org/apache/flink/configuration/JobManagerOptions.java    | 12 ------------
 2 files changed, 17 deletions(-)

diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index 177c362..73477fe 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -23,11 +23,6 @@
             <td>JVM heap size for the JobManager.</td>
         </tr>
         <tr>
-            <td><h5>jobmanager.resourcemanager.reconnect-interval</h5></td>
-            <td style="word-wrap: break-word;">2000</td>
-            <td>This option specifies the interval in order to trigger a resource manager reconnection if the connection to the resource manager has been lost. This option is only intended for internal use.</td>
-        </tr>
-        <tr>
             <td><h5>jobmanager.rpc.address</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby J [...]
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index a91b931..69f445f 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -120,18 +120,6 @@ public class JobManagerOptions {
 				).build());
 
 	/**
-	 * This option specifies the interval in order to trigger a resource manager reconnection if the connection
-	 * to the resource manager has been lost.
-	 *
-	 * <p>This option is only intended for internal use.
-	 */
-	public static final ConfigOption<Long> RESOURCE_MANAGER_RECONNECT_INTERVAL =
-		key("jobmanager.resourcemanager.reconnect-interval")
-		.defaultValue(2000L)
-		.withDescription("This option specifies the interval in order to trigger a resource manager reconnection if the connection" +
-			" to the resource manager has been lost. This option is only intended for internal use.");
-
-	/**
 	 * The location where the JobManager stores the archives of completed jobs.
 	 */
 	public static final ConfigOption<String> ARCHIVE_DIR =


[flink] 03/04: [hotfix][runtime] Move scheduling-related classes to new package

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9f43a51653c4d352e2adf3067254a68a33ed389
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed May 15 13:24:02 2019 +0200

    [hotfix][runtime] Move scheduling-related classes to new package
    
    Move SchedulerNG, SchedulerNGFactory, and its implementations to package
    org.apache.flink.runtime.scheduler.
---
 .../flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java       | 3 +--
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 2 ++
 .../runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java    | 2 +-
 .../apache/flink/runtime/{jobmaster => scheduler}/LegacyScheduler.java | 3 ++-
 .../flink/runtime/{jobmaster => scheduler}/LegacySchedulerFactory.java | 2 +-
 .../org/apache/flink/runtime/{jobmaster => scheduler}/SchedulerNG.java | 3 ++-
 .../flink/runtime/{jobmaster => scheduler}/SchedulerNGFactory.java     | 2 +-
 .../test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java    | 2 ++
 8 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index 6962b6e..b7f80b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -25,8 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
-import org.apache.flink.runtime.jobmaster.LegacySchedulerFactory;
-import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
 import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
@@ -36,6 +34,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 
 /**
  * Singleton default factory for {@link JobManagerRunner}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 627cc45..cc7139b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -73,6 +73,8 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index 3f80042..c89ecbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
-import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SchedulerFactory;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
similarity index 99%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index b6a256c..80b4bc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.jobmaster;
+package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -57,6 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
similarity index 98%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
index 33e14b6..12fb557 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.jobmaster;
+package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
similarity index 98%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 98809df..0c8c2c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.jobmaster;
+package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.queryablestate.KvStateID;
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
similarity index 97%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index f335252..edd439a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.flink.runtime.jobmaster;
+package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 6712570..71623fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -105,6 +105,8 @@ import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;


[flink] 04/04: [FLINK-12432][runtime] Add SchedulerNG stub implementation

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed31f4c76aa47ff9ded9c46927e4c1e97510088d
Author: Gary Yao <ga...@apache.org>
AuthorDate: Wed May 15 13:57:15 2019 +0200

    [FLINK-12432][runtime] Add SchedulerNG stub implementation
    
    Add new SchedulerNG stub implementation, which will represents the future
    default scheduler.
    
    Add feature toggle to switch between existing scheduler and stub
    implementation.
    
    Add ThrowingRestartStrategy to validate that in new scheduling code paths, the
    legacy restart strategies are not used.
    
    This closes #8452.
---
 .../flink/configuration/JobManagerOptions.java     | 13 ++++
 .../dispatcher/DefaultJobManagerRunnerFactory.java |  3 +-
 .../dispatcher/SchedulerNGFactoryFactory.java      | 52 +++++++++++++++
 .../restart/ThrowingRestartStrategy.java           | 53 +++++++++++++++
 .../flink/runtime/scheduler/DefaultScheduler.java  | 78 ++++++++++++++++++++++
 .../runtime/scheduler/DefaultSchedulerFactory.java | 73 ++++++++++++++++++++
 .../dispatcher/SchedulerNGFactoryFactoryTest.java  | 77 +++++++++++++++++++++
 .../ThrowingRestartStrategyFactoryTest.java        | 67 +++++++++++++++++++
 8 files changed, 414 insertions(+), 2 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 69f445f..a1d55b1 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -160,6 +160,19 @@ public class JobManagerOptions {
 			// default matches heartbeat.timeout so that sticky allocation is not lost on timeouts for local recovery
 			.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
 			.withDescription("The timeout in milliseconds for a idle slot in Slot Pool.");
+	/**
+	 * Config parameter determining the scheduler implementation.
+	 */
+	@Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
+	public static final ConfigOption<String> SCHEDULER =
+		key("jobmanager.scheduler")
+			.defaultValue("legacy")
+			.withDescription(Description.builder()
+				.text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
+				.list(
+					text("'legacy': legacy scheduler"),
+					text("'ng': new generation scheduler"))
+				.build());
 
 	// ---------------------------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index b7f80b6..c0707c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -57,8 +57,7 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory {
 
 		final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration);
 		final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration);
-		final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
-			jobManagerServices.getRestartStrategyFactory());
+		final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration, jobManagerServices.getRestartStrategyFactory());
 
 		final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
 			jobMasterConfiguration,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
new file mode 100644
index 0000000..a757bae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+
+final class SchedulerNGFactoryFactory {
+
+	private SchedulerNGFactoryFactory() {}
+
+	static SchedulerNGFactory createSchedulerNGFactory(
+			final Configuration configuration,
+			final RestartStrategyFactory restartStrategyFactory) {
+
+		final String schedulerName = configuration.getString(JobManagerOptions.SCHEDULER);
+		switch (schedulerName) {
+			case "legacy":
+				return new LegacySchedulerFactory(restartStrategyFactory);
+
+			case "ng":
+				return new DefaultSchedulerFactory();
+
+			default:
+				throw new IllegalArgumentException(String.format(
+					"Illegal value [%s] for config option [%s]",
+					schedulerName,
+					JobManagerOptions.SCHEDULER.key()));
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
new file mode 100644
index 0000000..e7355df
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategy.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+
+
+/**
+ * A restart strategy that validates that it is not in use by throwing {@link IllegalStateException}
+ * on any method call.
+ */
+public class ThrowingRestartStrategy implements RestartStrategy {
+
+	@Override
+	public boolean canRestart() {
+		throw new IllegalStateException("Unexpected canRestart() call");
+	}
+
+	@Override
+	public void restart(final RestartCallback restarter, final ScheduledExecutor executor) {
+		throw new IllegalStateException("Unexpected restart() call");
+	}
+
+	/**
+	 * Factory for {@link ThrowingRestartStrategy}.
+	 */
+	public static class ThrowingRestartStrategyFactory extends RestartStrategyFactory {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public RestartStrategy createRestartStrategy() {
+			return new ThrowingRestartStrategy();
+		}
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
new file mode 100644
index 0000000..427734a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Stub implementation of the future default scheduler.
+ */
+public class DefaultScheduler extends LegacyScheduler {
+
+	public DefaultScheduler(
+			final Logger log,
+			final JobGraph jobGraph,
+			final BackPressureStatsTracker backPressureStatsTracker,
+			final Executor ioExecutor,
+			final Configuration jobMasterConfiguration,
+			final SlotProvider slotProvider,
+			final ScheduledExecutorService futureExecutor,
+			final ClassLoader userCodeLoader,
+			final CheckpointRecoveryFactory checkpointRecoveryFactory,
+			final Time rpcTimeout,
+			final BlobWriter blobWriter,
+			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+			final Time slotRequestTimeout) throws Exception {
+
+		super(
+			log,
+			jobGraph,
+			backPressureStatsTracker,
+			ioExecutor,
+			jobMasterConfiguration,
+			slotProvider,
+			futureExecutor,
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcTimeout,
+			new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
+			blobWriter,
+			jobManagerJobMetricGroup,
+			slotRequestTimeout);
+	}
+
+	@Override
+	public void startScheduling() {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
new file mode 100644
index 0000000..71f8802
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+
+import org.slf4j.Logger;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Factory for {@link DefaultScheduler}.
+ */
+public class DefaultSchedulerFactory implements SchedulerNGFactory {
+
+	@Override
+	public SchedulerNG createInstance(
+			final Logger log,
+			final JobGraph jobGraph,
+			final BackPressureStatsTracker backPressureStatsTracker,
+			final Executor ioExecutor,
+			final Configuration jobMasterConfiguration,
+			final SlotProvider slotProvider,
+			final ScheduledExecutorService futureExecutor,
+			final ClassLoader userCodeLoader,
+			final CheckpointRecoveryFactory checkpointRecoveryFactory,
+			final Time rpcTimeout,
+			final BlobWriter blobWriter,
+			final JobManagerJobMetricGroup jobManagerJobMetricGroup,
+			final Time slotRequestTimeout) throws Exception {
+
+		return new DefaultScheduler(
+			log,
+			jobGraph,
+			backPressureStatsTracker,
+			ioExecutor,
+			jobMasterConfiguration,
+			slotProvider,
+			futureExecutor,
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcTimeout,
+			blobWriter,
+			jobManagerJobMetricGroup,
+			slotRequestTimeout);
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
new file mode 100644
index 0000000..cfaf0f0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
+import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
+import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SchedulerNGFactory}.
+ */
+public class SchedulerNGFactoryFactoryTest extends TestLogger {
+
+	private static final NoRestartStrategy.NoRestartStrategyFactory TEST_RESTART_STRATEGY_FACTORY = new NoRestartStrategy.NoRestartStrategyFactory();
+
+	@Test
+	public void createLegacySchedulerFactoryByDefault() {
+		final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(new Configuration());
+		assertThat(schedulerNGFactory, is(instanceOf(LegacySchedulerFactory.class)));
+	}
+
+	@Test
+	public void createSchedulerNGFactoryIfConfigured() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULER, "ng");
+
+		final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(configuration);
+
+		assertThat(schedulerNGFactory, is(instanceOf(DefaultSchedulerFactory.class)));
+	}
+
+	@Test
+	public void throwsExceptionIfSchedulerNameIsInvalid() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(JobManagerOptions.SCHEDULER, "invalid-scheduler-name");
+
+		try {
+			createSchedulerNGFactory(configuration);
+		} catch (IllegalArgumentException e) {
+			assertThat(e.getMessage(), containsString("Illegal value [invalid-scheduler-name]"));
+		}
+	}
+
+	private static SchedulerNGFactory createSchedulerNGFactory(final Configuration configuration) {
+		return SchedulerNGFactoryFactory.createSchedulerNGFactory(
+			configuration,
+			TEST_RESTART_STRATEGY_FACTORY);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java
new file mode 100644
index 0000000..dd4f399
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/ThrowingRestartStrategyFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ThrowingRestartStrategy}.
+ */
+public class ThrowingRestartStrategyFactoryTest extends TestLogger {
+
+	private RestartStrategy restartStrategy;
+
+	@Before
+	public void setUp() {
+		restartStrategy = new ThrowingRestartStrategy();
+	}
+
+	@Test
+	public void restartShouldThrowException() {
+		final ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
+
+		try {
+			restartStrategy.restart(new NoOpRestarter(), manuallyTriggeredScheduledExecutor);
+			fail("Expected exception not thrown");
+		} catch (IllegalStateException e) {
+			assertThat(e.getMessage(), is(equalTo("Unexpected restart() call")));
+			assertThat(manuallyTriggeredScheduledExecutor.numQueuedRunnables(), is(equalTo(0)));
+		}
+	}
+
+	@Test
+	public void canRestartShouldThrowException() {
+		try {
+			restartStrategy.canRestart();
+			fail("Expected exception not thrown");
+		} catch (IllegalStateException e) {
+			assertThat(e.getMessage(), is(equalTo("Unexpected canRestart() call")));
+		}
+	}
+}