You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/16 09:21:12 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #13656: [FLINK-19583] Expose the execution.runtime-mode to users

aljoscha commented on a change in pull request #13656:
URL: https://github.com/apache/flink/pull/13656#discussion_r506211458



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
##########
@@ -19,37 +19,129 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.streaming.api.RuntimeExecutionMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the detection of the {@link RuntimeExecutionMode runtime execution mode} during
  * stream graph translation.
  */
 public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger {
 
+	@Test
+	public void testExecutionModePropagationFromEnvWithDefaultAndBoundedSource() {
+		final StreamExecutionEnvironment environment =
+				StreamExecutionEnvironment.getExecutionEnvironment();
+
+		environment
+				.fromSource(
+						new MockSource(Boundedness.BOUNDED, 100),
+						WatermarkStrategy.noWatermarks(),
+						"bounded-source")
+				.print();
+
+		assertThat(
+				environment.getStreamGraph(),
+				hasProperties(
+						GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
+						ScheduleMode.EAGER,
+						true));
+	}
+
+	@Test
+	public void testExecutionModePropagationFromEnvWithDefaultAndUnboundedSource() {
+		final StreamExecutionEnvironment environment =
+				StreamExecutionEnvironment.getExecutionEnvironment();
+
+		environment
+				.fromSource(
+						new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100),
+						WatermarkStrategy.noWatermarks(),
+						"unbounded-source")
+				.print();
+
+		assertThat(
+				environment.getStreamGraph(),
+				hasProperties(
+						GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
+						ScheduleMode.EAGER,
+						true));
+	}
+
+	@Test
+	public void testExecutionModePropagationFromEnvWithAutomaticAndBoundedSource() {
+		final Configuration config = new Configuration();
+		config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.AUTOMATIC);
+
+		final StreamExecutionEnvironment environment =
+				StreamExecutionEnvironment.getExecutionEnvironment();
+		environment.configure(config, getClass().getClassLoader());
+
+		environment
+				.fromSource(
+						new MockSource(Boundedness.BOUNDED, 100),
+						WatermarkStrategy.noWatermarks(),
+						"bounded-source")
+				.print();
+
+		assertThat(
+				environment.getStreamGraph(),
+				hasProperties(
+						GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
+						ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+						false));
+	}
+
+	@Test(expected = IllegalStateException.class)

Review comment:
       Maybe using the `ExceptionException` rule would be better here because it's more fine grained and allows matching on the exception message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org