You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/13 01:53:44 UTC

[GitHub] [beam] nevillelyh opened a new pull request #12564: [BEAM-10612] add Flink 1.11 runner

nevillelyh opened a new pull request #12564:
URL: https://github.com/apache/beam/pull/12564


   WIP
   
   Almost there except the following error:
   
   ```
   > Task :runners:flink:1.11:compileJava
   /home/neville/src/apache/beam-git/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:702: error: timeServiceManager has private access in AbstractStreamOperator
         timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
   ```
   
   I tried working around it by forking the test & back porting the `StreamOperatorStateContext.internalTimerServiceManager` logic from upstream but ran into other issues including checkstyle errors w.r.t. missing `package-info.java` and test failures.
   
   @mxm any ideas?
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674377804


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] entimaniac commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
entimaniac commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-692709810


   thanks! that was all the information I was looking to hear


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nevillelyh commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
nevillelyh commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470145753



##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {

Review comment:
       @mxm addressed most code copy issues by adding compat layers. `./gradlew -p runners/flink test` passes for all versions but `build` fails checkstyle due to missing `package-info.java`, even though it exists in the base `src/main` dir. Any idea how to fix?
   
   I'll look at the docs next. Not sure about the CI test scripts though.
   
   ```
   > Task :runners:flink:1.10:checkstyleMain FAILED
   [ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.10/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
   
   > Task :runners:flink:1.8:checkstyleMain FAILED
   [ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.8/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
   
   > Task :runners:flink:1.9:checkstyleMain
   [ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.9/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
   
   > Task :runners:flink:1.9:checkstyleMain FAILED
   
   > Task :runners:flink:1.11:checkstyleMain
   [ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.11/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] entimaniac commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
entimaniac commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-692685878


   Hey, I know this is a closed PR but I'm hoping someone can help me. I'm trying to understand how I might be able to take advantage of this PR. The environment I'm trying to run my beam job on has flink 1.11 installed but the only runner package I see available is for 1.10 (https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink-1.10) Is there another way of utilizing this code?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nevillelyh commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
nevillelyh commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674178691


   Squashed & made `timeServiceManager` a field. Had to rename it though to avoid conflict with <= 1.10.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nevillelyh commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
nevillelyh commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r469992976



##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {

Review comment:
       Yeah should be doable. I'll add a shim.
   Also totally missed `getTimeServiceManager`. My IDEA is still glitchy with the setup, need to get use to the multiple versions code sharing flow :joy:




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-692699588


   Support for Flink 1.11 will be included in Beam 2.25.0. Until then, 
   you'll have to manually build the jar from source using the following 
   command:
   
        ./gradlew :runners:flink:1.11:jar
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674186001






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674372723


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm merged pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm merged pull request #12564:
URL: https://github.com/apache/beam/pull/12564


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674185907


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674205720






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470543371



##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+/** A {@link MiniCluster} which allows remote connections for the end-to-end test. */
+public class RemoteMiniClusterImpl extends RemoteMiniCluster {
+
+  public RemoteMiniClusterImpl(MiniClusterConfiguration miniClusterConfiguration) {
+    super(miniClusterConfiguration);
+  }
+
+  @Override
+  public int getClusterPort() {
+    return getClusterInformation().getBlobServerPort();

Review comment:
       Is it really the blob server port here or do we want the Job server RPC port?

##########
File path: runners/flink/1.8/build.gradle
##########
@@ -17,7 +17,6 @@
  */
 
 def basePath = '..'
-

Review comment:
       nit: unrelated change

##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {

Review comment:
       Thanks!
   
   If the problem doesn't go away after running `./gradlew -p runners/flink clean`, then you might have to add a suppression here: https://github.com/apache/beam/blob/02bf081d0e86f16395af415cebee2812620aff4b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml#L111




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674378273


   Thanks for making this happen!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470732067



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -699,7 +699,7 @@ public final void processWatermark1(Watermark mark) throws Exception {
 
     long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark());
     if (keyCoder != null) {
-      timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+      getTimeServiceManagerCompat().advanceWatermark(new Watermark(inputWatermarkHold));

Review comment:
       Maybe add a field for this instead of calling the getter every time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674377842


   Unrelated test failure in Python: 
   
   ```
   Test Result (2 failures / +1)
   
       apache_beam.transforms.ptransform_test.PTransformTypeCheckTestCase.test_pardo_properly_type_checks_using_type_hint_decorators
   ```
   
   https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14588/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r469758128



##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {

Review comment:
       Do you think we could avoid duplicating all this code? If the signature changed, we should rather define an Interface to retrieve the pipeline/construct the environment, which different versions can override. Alternatively, using reflection is also a valid option in tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nevillelyh commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner

Posted by GitBox <gi...@apache.org>.
nevillelyh commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470700571



##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+/** A {@link MiniCluster} which allows remote connections for the end-to-end test. */
+public class RemoteMiniClusterImpl extends RemoteMiniCluster {
+
+  public RemoteMiniClusterImpl(MiniClusterConfiguration miniClusterConfiguration) {
+    super(miniClusterConfiguration);
+  }
+
+  @Override
+  public int getClusterPort() {
+    return getClusterInformation().getBlobServerPort();

Review comment:
       Yeah I really monkeyed that one :see_no_evil:
   
   If I understood it correctly, it's a workaround for this, to expose port in `useSingleRpcService` mode:
   https://github.com/apache/flink/blob/release-1.10.1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L270
   
   Which was changed to `createLocalRpcService` in 1.11.
   https://github.com/apache/flink/blob/release-1.11.1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L271
   
   Pushed a fix for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org