You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/09 10:20:38 UTC

[GitHub] [flink] infoverload opened a new pull request, #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

infoverload opened a new pull request, #19414:
URL: https://github.com/apache/flink/pull/19414

   ## What is the purpose of the change
   
   (https://issues.apache.org/jira/browse/FLINK-26389)](https://issues.apache.org/jira/browse/FLINK-26389)
   
   ## Brief change log
   
   Many operators of ”flink-end-to-end-tests“ have been deprecated and need to be updated.  More details are listed in the JIRA issue. 
   
   
   ## Verifying this change
   
   This change is a code cleanup on existing e2e test.
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   CI tests 
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847354659


##########
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java:
##########
@@ -65,7 +67,11 @@ public static void main(String[] args) throws Exception {
                                 Files.size(inputFile),
                                 inputDir.toAbsolutePath().toString(),
                                 containedFile.getFileName().toString()))
-                .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+                .sinkTo(
+                        FileSink.forRowFormat(
+                                        new org.apache.flink.core.fs.Path(outputPath),

Review Comment:
   Turns out I am using both! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847355828


##########
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##########
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
     private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt)
             throws Exception {
+        Duration maxOutOfOrderness = extractTimestamp(pt);
         KeyedStream<Event, Integer> source =
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        .assignTimestampsAndWatermarks(
+                                WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   Are you sure? Because the `Duration maxOutOfOrderness = extractTimestamp(pt);` above already has the timestamp 



##########
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##########
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
     private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt)
             throws Exception {
+        Duration maxOutOfOrderness = extractTimestamp(pt);
         KeyedStream<Event, Integer> source =
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        .assignTimestampsAndWatermarks(
+                                WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   Are you sure? Because the `Duration maxOutOfOrderness = extractTimestamp(pt)` above already has the timestamp 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847359774


##########
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##########
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
     private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt)
             throws Exception {
+        Duration maxOutOfOrderness = extractTimestamp(pt);
         KeyedStream<Event, Integer> source =
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        .assignTimestampsAndWatermarks(
+                                WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   I mean the timestamp extractor from the record



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847299267


##########
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java:
##########
@@ -65,7 +67,11 @@ public static void main(String[] args) throws Exception {
                                 Files.size(inputFile),
                                 inputDir.toAbsolutePath().toString(),
                                 containedFile.getFileName().toString()))
-                .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+                .sinkTo(
+                        FileSink.forRowFormat(
+                                        new org.apache.flink.core.fs.Path(outputPath),

Review Comment:
   But you're not using the import from nio, so replace it with the one from flink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19414:
URL: https://github.com/apache/flink/pull/19414#issuecomment-1093869397

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "11e4ba949253a3894a0d264afdf929c4a35fca47",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "11e4ba949253a3894a0d264afdf929c4a35fca47",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 11e4ba949253a3894a0d264afdf929c4a35fca47 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847269129


##########
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java:
##########
@@ -65,7 +67,11 @@ public static void main(String[] args) throws Exception {
                                 Files.size(inputFile),
                                 inputDir.toAbsolutePath().toString(),
                                 containedFile.getFileName().toString()))
-                .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+                .sinkTo(
+                        FileSink.forRowFormat(
+                                        new org.apache.flink.core.fs.Path(outputPath),

Review Comment:
   I did that because of `java.nio.file.Path;` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] infoverload commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
infoverload commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847454084


##########
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##########
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
     private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt)
             throws Exception {
+        Duration maxOutOfOrderness = extractTimestamp(pt);
         KeyedStream<Event, Integer> source =
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        .assignTimestampsAndWatermarks(
+                                WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   I tried various things including 
   
   `WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).createTimestampAssigner(createTimestampExtractor(pt)))`
   
   and 
   
   `WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withTimestampAssigner(createTimestampExtractor(pt)))`
   
   but the types never seem to match.
   
   Not sure how to reconcile the fact that it is looking for a `TimestampAssignerSupplier` but getting a `BoundedOutOfOrdernessTimestampExtractor`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r849301627


##########
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##########
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
     private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt)
             throws Exception {
+        Duration maxOutOfOrderness = extractTimestamp(pt);
         KeyedStream<Event, Integer> source =
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        .assignTimestampsAndWatermarks(
+                                WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   I think you need to changed the return type of `createTimestampExtractor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] slinkydeveloper commented on a diff in pull request #19414: [FLINK 26389][tests] update deprecated operators in e2e tests

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on code in PR #19414:
URL: https://github.com/apache/flink/pull/19414#discussion_r847067704


##########
flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java:
##########
@@ -65,7 +67,11 @@ public static void main(String[] args) throws Exception {
                                 Files.size(inputFile),
                                 inputDir.toAbsolutePath().toString(),
                                 containedFile.getFileName().toString()))
-                .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+                .sinkTo(
+                        FileSink.forRowFormat(
+                                        new org.apache.flink.core.fs.Path(outputPath),

Review Comment:
   Remove the FQCN here



##########
flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java:
##########
@@ -90,11 +92,13 @@ public static void main(String[] args) throws Exception {
 
     private static void executeOriginalVariant(StreamExecutionEnvironment env, ParameterTool pt)
             throws Exception {
+        Duration maxOutOfOrderness = extractTimestamp(pt);
         KeyedStream<Event, Integer> source =
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        .assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        .assignTimestampsAndWatermarks(
+                                WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness))

Review Comment:
   After this method invocation you still need the watermark assigner here I think, same below, using the `createTimestampExtractor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org