You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/13 01:53:44 UTC
[GitHub] [beam] nevillelyh opened a new pull request #12564: [BEAM-10612] add Flink 1.11 runner
nevillelyh opened a new pull request #12564:
URL: https://github.com/apache/beam/pull/12564
WIP
Almost there except the following error:
```
> Task :runners:flink:1.11:compileJava
/home/neville/src/apache/beam-git/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:702: error: timeServiceManager has private access in AbstractStreamOperator
timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
```
I tried working around it by forking the test & back porting the `StreamOperatorStateContext.internalTimerServiceManager` logic from upstream but ran into other issues including checkstyle errors w.r.t. missing `package-info.java` and test failures.
@mxm any ideas?
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
--- | --- | --- | --- | --- | --- | ---
Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674377804
Run Python PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] entimaniac commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
entimaniac commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-692709810
thanks! that was all the information I was looking to hear
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nevillelyh commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
nevillelyh commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470145753
##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {
Review comment:
@mxm addressed most code copy issues by adding compat layers. `./gradlew -p runners/flink test` passes for all versions but `build` fails checkstyle due to missing `package-info.java`, even though it exists in the base `src/main` dir. Any idea how to fix?
I'll look at the docs next. Not sure about the CI test scripts though.
```
> Task :runners:flink:1.10:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.10/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
> Task :runners:flink:1.8:checkstyleMain FAILED
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.8/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
> Task :runners:flink:1.9:checkstyleMain
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.9/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
> Task :runners:flink:1.9:checkstyleMain FAILED
> Task :runners:flink:1.11:checkstyleMain
[ant:checkstyle] [ERROR] /home/neville/src/apache/beam/runners/flink/1.11/build/source-overrides/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/AbstractStreamOperatorCompat.java:1: Missing package-info.java file. [JavadocPackage]
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] entimaniac commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
entimaniac commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-692685878
Hey, I know this is a closed PR but I'm hoping someone can help me. I'm trying to understand how I might be able to take advantage of this PR. The environment I'm trying to run my beam job on has flink 1.11 installed but the only runner package I see available is for 1.10 (https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink-1.10) Is there another way of utilizing this code?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nevillelyh commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
nevillelyh commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674178691
Squashed & made `timeServiceManager` a field. Had to rename it though to avoid conflict with <= 1.10.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nevillelyh commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
nevillelyh commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r469992976
##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {
Review comment:
Yeah should be doable. I'll add a shim.
Also totally missed `getTimeServiceManager`. My IDEA is still glitchy with the setup, need to get use to the multiple versions code sharing flow :joy:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-692699588
Support for Flink 1.11 will be included in Beam 2.25.0. Until then,
you'll have to manually build the jar from source using the following
command:
./gradlew :runners:flink:1.11:jar
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674186001
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674372723
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm merged pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm merged pull request #12564:
URL: https://github.com/apache/beam/pull/12564
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674185907
Run PythonDocker PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674205720
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470543371
##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+/** A {@link MiniCluster} which allows remote connections for the end-to-end test. */
+public class RemoteMiniClusterImpl extends RemoteMiniCluster {
+
+ public RemoteMiniClusterImpl(MiniClusterConfiguration miniClusterConfiguration) {
+ super(miniClusterConfiguration);
+ }
+
+ @Override
+ public int getClusterPort() {
+ return getClusterInformation().getBlobServerPort();
Review comment:
Is it really the blob server port here or do we want the Job server RPC port?
##########
File path: runners/flink/1.8/build.gradle
##########
@@ -17,7 +17,6 @@
*/
def basePath = '..'
-
Review comment:
nit: unrelated change
##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {
Review comment:
Thanks!
If the problem doesn't go away after running `./gradlew -p runners/flink clean`, then you might have to add a suppression here: https://github.com/apache/beam/blob/02bf081d0e86f16395af415cebee2812620aff4b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml#L111
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674378273
Thanks for making this happen!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470732067
##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -699,7 +699,7 @@ public final void processWatermark1(Watermark mark) throws Exception {
long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark());
if (keyCoder != null) {
- timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+ getTimeServiceManagerCompat().advanceWatermark(new Watermark(inputWatermarkHold));
Review comment:
Maybe add a field for this instead of calling the getter every time?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #12564:
URL: https://github.com/apache/beam/pull/12564#issuecomment-674377842
Unrelated test failure in Python:
```
Test Result (2 failures / +1)
apache_beam.transforms.ptransform_test.PTransformTypeCheckTestCase.test_pardo_properly_type_checks_using_type_hint_decorators
```
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/14588/
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r469758128
##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/FlinkRunnerTest.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.CoreMatchers.allOf;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkRunner}.
+ *
+ * <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
+ * OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
+ * https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
+ */
+public class FlinkRunnerTest {
Review comment:
Do you think we could avoid duplicating all this code? If the signature changed, we should rather define an Interface to retrieve the pipeline/construct the environment, which different versions can override. Alternatively, using reflection is also a valid option in tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nevillelyh commented on a change in pull request #12564: [BEAM-10612] add Flink 1.11 runner
Posted by GitBox <gi...@apache.org>.
nevillelyh commented on a change in pull request #12564:
URL: https://github.com/apache/beam/pull/12564#discussion_r470700571
##########
File path: runners/flink/1.11/src/test/java/org/apache/beam/runners/flink/RemoteMiniClusterImpl.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+/** A {@link MiniCluster} which allows remote connections for the end-to-end test. */
+public class RemoteMiniClusterImpl extends RemoteMiniCluster {
+
+ public RemoteMiniClusterImpl(MiniClusterConfiguration miniClusterConfiguration) {
+ super(miniClusterConfiguration);
+ }
+
+ @Override
+ public int getClusterPort() {
+ return getClusterInformation().getBlobServerPort();
Review comment:
Yeah I really monkeyed that one :see_no_evil:
If I understood it correctly, it's a workaround for this, to expose port in `useSingleRpcService` mode:
https://github.com/apache/flink/blob/release-1.10.1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L270
Which was changed to `createLocalRpcService` in 1.11.
https://github.com/apache/flink/blob/release-1.11.1/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java#L271
Pushed a fix for this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org