You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/01/10 21:17:51 UTC

[beam] 02/02: [BEAM-6248] Add Flink v1.7 build target to Flink Runner (#7300)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 19bb8973d54f364b4da06827e2c2c8bda7ed286b
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Jan 10 15:41:38 2019 -0500

    [BEAM-6248] Add Flink v1.7 build target to Flink Runner  (#7300)
---
 runners/flink/1.7/build.gradle                     | 33 ++++++++++++++++++++++
 .../flink/1.7/job-server-container/build.gradle    | 26 +++++++++++++++++
 runners/flink/1.7/job-server/build.gradle          | 30 ++++++++++++++++++++
 .../runners/flink/FlinkExecutionEnvironments.java  |  5 +++-
 .../beam/runners/flink/PortableExecutionTest.java  |  1 +
 .../runners/flink/PortableStateExecutionTest.java  |  1 +
 .../runners/flink/PortableTimersExecutionTest.java |  1 +
 settings.gradle                                    |  7 +++++
 8 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/runners/flink/1.7/build.gradle b/runners/flink/1.7/build.gradle
new file mode 100644
index 0000000..5982c1b
--- /dev/null
+++ b/runners/flink/1.7/build.gradle
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '..'
+
+/* All properties required for loading the Flink build script */
+project.ext {
+  // Set the version of all Flink-related dependencies here.
+  flink_version = '1.7.1'
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_runner.gradle"
diff --git a/runners/flink/1.7/job-server-container/build.gradle b/runners/flink/1.7/job-server-container/build.gradle
new file mode 100644
index 0000000..afdb68a
--- /dev/null
+++ b/runners/flink/1.7/job-server-container/build.gradle
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server-container'
+
+project.ext {
+  resource_path = basePath
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server_container.gradle"
diff --git a/runners/flink/1.7/job-server/build.gradle b/runners/flink/1.7/job-server/build.gradle
new file mode 100644
index 0000000..d5251e0
--- /dev/null
+++ b/runners/flink/1.7/job-server/build.gradle
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+def basePath = '../../job-server'
+
+project.ext {
+  // Look for the source code in the parent module
+  main_source_dirs = ["$basePath/src/main/java"]
+  test_source_dirs = ["$basePath/src/test/java"]
+  main_resources_dirs = ["$basePath/src/main/resources"]
+  test_resources_dirs = ["$basePath/src/test/resources"]
+}
+
+// Load the main build script which contains all build logic.
+apply from: "$basePath/flink_job_server.gradle"
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index f8108c9..f2eb538 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -338,7 +338,10 @@ public class FlinkExecutionEnvironments {
 
       final ClusterClient<?> client;
       try {
-        if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
+        // Write out the option keys and values to be compatible across different Flink versions,
+        // CoreOptions.MODE and its values CoreOptions.LEGACY_MODE and CoreOptions.NEW_MODE
+        // have been removed.
+        if ("legacy".equals(configuration.getString("mode", "new"))) {
           client = new StandaloneClusterClient(configuration);
         } else {
           client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 9542bdd..f82a4f4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -85,6 +85,7 @@ public class PortableExecutionTest implements Serializable {
     options.setRunner(CrashingRunner.class);
     options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
     options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
+    options.as(FlinkPipelineOptions.class).setParallelism(2);
     options
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index a658a1c..7ed7312 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -86,6 +86,7 @@ public class PortableStateExecutionTest implements Serializable {
     options.setRunner(CrashingRunner.class);
     options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
     options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
+    options.as(FlinkPipelineOptions.class).setParallelism(2);
     options
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index d9639b8..152a76a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -92,6 +92,7 @@ public class PortableTimersExecutionTest implements Serializable {
     options.setRunner(CrashingRunner.class);
     options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
     options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
+    options.as(FlinkPipelineOptions.class).setParallelism(2);
     options
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
diff --git a/settings.gradle b/settings.gradle
index aac5bf9..c9e3f4c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -53,6 +53,13 @@ include "beam-runners-flink-1.6-job-server"
 project(":beam-runners-flink-1.6-job-server").dir = file("runners/flink/1.6/job-server")
 include "beam-runners-flink-1.6-job-server-container"
 project(":beam-runners-flink-1.6-job-server-container").dir = file("runners/flink/1.6/job-server-container")
+// 1.7 version
+include "beam-runners-flink-1.7"
+project(":beam-runners-flink-1.7").dir = file("runners/flink/1.7")
+include "beam-runners-flink-1.7-job-server"
+project(":beam-runners-flink-1.7-job-server").dir = file("runners/flink/1.7/job-server")
+include "beam-runners-flink-1.7-job-server-container"
+project(":beam-runners-flink-1.7-job-server-container").dir = file("runners/flink/1.7/job-server-container")
 /* End Flink Runner related settings */
 include "beam-runners-gcp-gcemd"
 project(":beam-runners-gcp-gcemd").dir = file("runners/gcp/gcemd")