You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/27 12:37:53 UTC

[GitHub] [flink] zentol commented on a change in pull request #14767: [FLINK-21099] Introduce JobType to distinguish between batch and streaming jobs

zentol commented on a change in pull request #14767:
URL: https://github.com/apache/flink/pull/14767#discussion_r565269327



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java
##########
@@ -105,6 +107,7 @@ public void testOneInputTransformation() {
         assertThat(graph.getStateBackend(), instanceOf(BatchExecutionStateBackend.class));
         // the provider is passed as a lambda therefore we cannot assert the class of the provider
         assertThat(graph.getTimerServiceProvider(), notNullValue());
+        assertThat(graph.getJobType(), is(JobType.BATCH));

Review comment:
       I'm not really sold on this shotgun-style assertion approach.
   
   I'd think that this would be fine for a separate test:
   
   ```
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           SingleOutputStreamOperator<Integer> process =
                   env.fromElements(1, 2).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
           DataStreamSink<Integer> sink = process.addSink(new DiscardingSink<>());
           StreamGraphGenerator graphGenerator =
                   new StreamGraphGenerator(
                           Collections.singletonList(sink.getTransformation()),
                           env.getConfig(),
                           env.getCheckpointConfig());
           graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH);
   
           StreamGraph graph = graphGenerator.generate();
           assertThat(graph.getJobType(), is(JobType.BATCH));
   ```

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -751,6 +760,8 @@ public void testPartitionTypesInBatchMode() {
         assertThat(
                 verticesSorted.get(4) /* forward - sink */,
                 hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED));
+
+        assertEquals(JobType.BATCH, jobGraph.getJobType());

Review comment:
       A separate test similar to the STREAMING case would be good. Is there maybe a test somewhere for `StreamGraphGenerator#shouldExecuteInBatchMode` that we could extend?
   
   Otherwise, I'd prefer a test like this:
   
   ```
           StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           env.setRuntimeMode(RuntimeExecutionMode.BATCH);
           env.fromElements("test").addSink(new DiscardingSink<>());
           JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
           assertEquals(JobType.BATCH, jobGraph.getJobType());
   ```

##########
File path: flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
##########
@@ -94,6 +95,7 @@ private JobGraph createJobGraph(int numRecordsToSend) {
         StreamGraph streamGraph = env.getStreamGraph();
         streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
         streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+        streamGraph.setJobType(JobType.BATCH);

Review comment:
       hmm...maybe we could save ourselves some trouble by implicitly setting the job type if certain exchange modes / schedule modes are set.
   
   In any case a comment would be good to clarify which of the 2 options (or both) results in the job being considered a batch one.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
##########
@@ -70,6 +71,7 @@ public static void setBatchProperties(StreamGraph streamGraph, TableConfig table
         streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
         streamGraph.setStateBackend(null);
         streamGraph.setCheckpointStorage(null);
+        streamGraph.setJobType(JobType.BATCH);

Review comment:
       let's move this right next to the schedule mode, and add a comment that this is dependent on the schedule mode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org