You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/04/02 12:55:29 UTC
[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/5801
[FLINK-9121] [flip6] Remove Flip6 prefixes and other references
## What is the purpose of the change
Remove Flip6 prefixes and references to make Flip-6 the proper default:
Rename categories Flip6 -> New and OldAndFlip6 -> LegacyAndNew
Remove Flip-6 from documentation
Remove Flip-6 from start up scripts
## Verifying this change
- Covered by existing tests
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink removeFlip6
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5801.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5801
----
commit 82969a9fda2b8132abebf50b69d6bf0b055a97cd
Author: Till Rohrmann <tr...@...>
Date: 2018-04-02T09:13:57Z
[FLINK-9121] [flip6] Remove Flip6 prefixes and other references
Remove Flip6 prefixes and references to make Flip-6 the proper default:
Rename categories Flip6 -> New and OldAndFlip6 -> LegacyAndNew
Remove Flip-6 from documentation
Remove Flip-6 from start up scripts
----
---
[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5801#discussion_r203352693
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
@@ -49,30 +54,45 @@ public YarnClusterDescriptor(
@Override
protected String getYarnSessionClusterEntrypoint() {
- return YarnApplicationMasterRunner.class.getName();
+ return YarnSessionClusterEntrypoint.class.getName();
}
@Override
protected String getYarnJobClusterEntrypoint() {
- throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode.");
+ return YarnJobClusterEntrypoint.class.getName();
}
@Override
- public YarnClusterClient deployJobCluster(
- ClusterSpecification clusterSpecification,
- JobGraph jobGraph,
- boolean detached) {
- throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
+ public ClusterClient<ApplicationId> deployJobCluster(
+ ClusterSpecification clusterSpecification,
+ JobGraph jobGraph,
+ boolean detached) throws ClusterDeploymentException {
+
+ // this is required because the slots are allocated lazily
+ jobGraph.setAllowQueuedScheduling(true);
+
+ try {
+ return deployInternal(
+ clusterSpecification,
+ "Flink per-job cluster",
+ getYarnJobClusterEntrypoint(),
+ jobGraph,
+ detached);
+ } catch (Exception e) {
+ throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
+ }
}
@Override
- protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
- return new YarnClusterClient(
- descriptor,
- numberTaskManagers,
- slotsPerTaskManager,
- report,
+ protected ClusterClient<ApplicationId> createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
+ ApplicationReport report,
+ Configuration flinkConfiguration,
+ boolean perJobCluster) throws Exception {
+ return new RestClusterClient<>(
--- End diff --
Because the new Flink architecture will use REST calls for the client-server communication. This is unlike how it was done in the legacy architecture (pre Flip-6).
---
[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5801
---
[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
Posted by Matrix42 <gi...@git.apache.org>.
Github user Matrix42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5801#discussion_r202548776
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
@@ -49,30 +54,45 @@ public YarnClusterDescriptor(
@Override
protected String getYarnSessionClusterEntrypoint() {
- return YarnApplicationMasterRunner.class.getName();
+ return YarnSessionClusterEntrypoint.class.getName();
}
@Override
protected String getYarnJobClusterEntrypoint() {
- throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode.");
+ return YarnJobClusterEntrypoint.class.getName();
}
@Override
- public YarnClusterClient deployJobCluster(
- ClusterSpecification clusterSpecification,
- JobGraph jobGraph,
- boolean detached) {
- throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
+ public ClusterClient<ApplicationId> deployJobCluster(
+ ClusterSpecification clusterSpecification,
+ JobGraph jobGraph,
+ boolean detached) throws ClusterDeploymentException {
+
+ // this is required because the slots are allocated lazily
+ jobGraph.setAllowQueuedScheduling(true);
+
+ try {
+ return deployInternal(
+ clusterSpecification,
+ "Flink per-job cluster",
+ getYarnJobClusterEntrypoint(),
+ jobGraph,
+ detached);
+ } catch (Exception e) {
+ throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
+ }
}
@Override
- protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
- return new YarnClusterClient(
- descriptor,
- numberTaskManagers,
- slotsPerTaskManager,
- report,
+ protected ClusterClient<ApplicationId> createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
+ ApplicationReport report,
+ Configuration flinkConfiguration,
+ boolean perJobCluster) throws Exception {
+ return new RestClusterClient<>(
--- End diff --
why don't return a YarnClusterClient here?
---