You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/03 06:25:53 UTC

[flink] branch master updated: [FLINK-19084] Remove deprecated methods from ExecutionConfig

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a274a0  [FLINK-19084] Remove deprecated methods from ExecutionConfig
1a274a0 is described below

commit 1a274a0e6777587826efa635e7bf4af253ff04f9
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 3 08:23:40 2020 +0200

    [FLINK-19084] Remove deprecated methods from ExecutionConfig
    
    This commit removes deprecated, ineffective methods:
     - set/isFailTaskOnCheckpointError
     - disable/enableSysoutLogging
     - isLatencyTrackingEnabled
---
 docs/dev/execution_configuration.md                |  2 -
 docs/dev/execution_configuration.zh.md             |  2 -
 docs/ops/cli.md                                    |  4 --
 docs/ops/cli.zh.md                                 |  4 --
 .../apache/flink/client/cli/CliFrontendParser.java | 10 ----
 .../flink/client/cli/CliFrontendRunTest.java       |  6 ---
 .../connectors/kafka/KafkaConsumerTestBase.java    |  4 --
 flink-core/pom.xml                                 |  5 ++
 .../apache/flink/api/common/ExecutionConfig.java   | 63 ----------------------
 .../test/Elasticsearch5SinkExample.scala           |  1 -
 flink-end-to-end-tests/test-scripts/test_cli.sh    |  2 +-
 flink-python/pyflink/common/execution_config.py    | 26 ---------
 12 files changed, 6 insertions(+), 123 deletions(-)

diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md
index 02cdea4..3907cf1 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -62,8 +62,6 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
 
 - `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.
 
-- **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior.
-
 - `getGlobalJobParameters()` / `setGlobalJobParameters()` This method allows users to set custom objects as a global configuration for the job. Since the `ExecutionConfig` is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.
 
 - `addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)` Register a Kryo serializer instance for the given `type`.
diff --git a/docs/dev/execution_configuration.zh.md b/docs/dev/execution_configuration.zh.md
index 0252487..02efdc0 100644
--- a/docs/dev/execution_configuration.zh.md
+++ b/docs/dev/execution_configuration.zh.md
@@ -62,8 +62,6 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
 
 - `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.
 
-- **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior.
-
 - `getGlobalJobParameters()` / `setGlobalJobParameters()` This method allows users to set custom objects as a global configuration for the job. Since the `ExecutionConfig` is accessible in all user defined functions, this is an easy method for making configuration globally available in a job.
 
 - `addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)` Register a Kryo serializer instance for the given `type`.
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 2872472..aad9a78 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -92,10 +92,6 @@ These examples about how to submit a job in CLI.
         ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                              --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
 
--   Run example program with flink log output disabled:
-
-            ./bin/flink run -q ./examples/batch/WordCount.jar
-
 -   Run example program in detached mode:
 
             ./bin/flink run -d ./examples/batch/WordCount.jar
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index 3a9f471..5739227 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -92,10 +92,6 @@ option.
         ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                              --input file:///home/user/hamlet.txt --output file:///home/user/wordcount_out
 
--   Run example program with flink log output disabled:
-
-            ./bin/flink run -q ./examples/batch/WordCount.jar
-
 -   Run example program in detached mode:
 
             ./bin/flink run -d ./examples/batch/WordCount.jar
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 49b532c..10f79ad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -56,14 +56,6 @@ public class CliFrontendParser {
 	public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
 			"The parallelism with which to run the program. Optional flag to override the default value " +
 			"specified in the configuration.");
-
-	/**
-	 * @deprecated This has no effect anymore, we're keeping it to not break existing bash scripts.
-	 */
-	@Deprecated
-	static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
-			"suppress logging output to standard out.");
-
 	public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
 			"the job in detached mode");
 
@@ -184,7 +176,6 @@ public class CliFrontendParser {
 		PARALLELISM_OPTION.setRequired(false);
 		PARALLELISM_OPTION.setArgName("parallelism");
 
-		LOGGING_OPTION.setRequired(false);
 		DETACHED_OPTION.setRequired(false);
 		SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false);
 		YARN_DETACHED_OPTION.setRequired(false);
@@ -245,7 +236,6 @@ public class CliFrontendParser {
 		options.addOption(CLASSPATH_OPTION);
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(ARGS_OPTION);
-		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
 		options.addOption(SHUTDOWN_IF_ATTACHED_OPTION);
 		options.addOption(YARN_DETACHED_OPTION);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index f3280d0..73402d2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -72,12 +72,6 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 			verifyCliFrontend(getCli(configuration), parameters, 42, false);
 		}
 
-		// test configure sysout logging
-		{
-			String[] parameters = {"-p", "2", "-q", getTestJarPath()};
-			verifyCliFrontend(getCli(configuration), parameters, 2, false);
-		}
-
 		// test detached mode
 		{
 			String[] parameters = {"-p", "2", "-d", getTestJarPath()};
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7f0f0ed..9d779f6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -166,7 +166,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 			Properties properties = new Properties();
 
 			StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
-			see.getConfig().disableSysoutLogging();
 			see.setRestartStrategy(RestartStrategies.noRestart());
 			see.setParallelism(1);
 
@@ -1519,7 +1518,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
 		env1.setParallelism(1);
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env1.getConfig().disableSysoutLogging();
 
 		Properties props = new Properties();
 		props.putAll(standardProps);
@@ -1553,7 +1551,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		env1.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env1.setParallelism(1);
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env1.getConfig().disableSysoutLogging();
 
 		Properties props = new Properties();
 		props.putAll(standardProps);
@@ -1619,7 +1616,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final StreamExecutionEnvironment env1 = StreamExecutionEnvironment.getExecutionEnvironment();
 		env1.setParallelism(1);
 		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-		env1.getConfig().disableSysoutLogging();
 		env1.disableOperatorChaining(); // let the source read everything into the network buffers
 
 		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> schema = new TypeInformationSerializationSchema<>(
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 8ee4996..e65aff4 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -184,6 +184,11 @@ under the License.
 							<!-- leaked constructor in TypeHint -->
 							<exclude>org.apache.flink.api.common.typeinfo.TypeHint</exclude>
 							<exclude>org.apache.flink.api.java.typeutils.TypeInfoParser</exclude>
+
+							<!-- removed in 1.12, before that the settings were ineffective anyway -->
+							<exclude>org.apache.flink.api.common.ExecutionConfig#disableSysoutLogging()</exclude>
+							<exclude>org.apache.flink.api.common.ExecutionConfig#enableSysoutLogging()</exclude>
+							<exclude>org.apache.flink.api.common.ExecutionConfig#isSysoutLoggingEnabled()</exclude>
 						</excludes>
 					</parameter>
 				</configuration>
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index e362d9a..170d5c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -154,13 +154,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
 	/** This flag defines if we use compression for the state snapshot data or not. Default: false */
 	private boolean useSnapshotCompression = false;
-
-	/**
-	 * @deprecated Should no longer be used because we would not support to let task directly fail on checkpoint error.
-	 */
-	@Deprecated
-	private boolean failTaskOnCheckpointError = true;
-
 	/** The default input dependency constraint to schedule tasks. */
 	private InputDependencyConstraint defaultInputDependencyConstraint = InputDependencyConstraint.ANY;
 
@@ -279,15 +272,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 		return latencyTrackingInterval;
 	}
 
-	/**
-	 * @deprecated will be removed in a future version
-	 */
-	@PublicEvolving
-	@Deprecated
-	public boolean isLatencyTrackingEnabled() {
-		return isLatencyTrackingConfigured && latencyTrackingInterval > 0;
-	}
-
 	@Internal
 	public boolean isLatencyTrackingConfigured() {
 		return isLatencyTrackingConfigured;
@@ -721,30 +705,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 		return objectReuse;
 	}
 
-	/**
-	 * @deprecated Ineffective. Will be removed at 2.0.
-	 */
-	@Deprecated
-	public ExecutionConfig enableSysoutLogging() {
-		return this;
-	}
-
-	/**
-	 * @deprecated Ineffective. Will be removed at 2.0.
-	 */
-	@Deprecated
-	public ExecutionConfig disableSysoutLogging() {
-		return this;
-	}
-
-	/**
-	 * @deprecated Ineffective. Will be removed at 2.0.
-	 */
-	@Deprecated
-	public boolean isSysoutLoggingEnabled() {
-		return false;
-	}
-
 	public GlobalJobParameters getGlobalJobParameters() {
 		return globalJobParameters;
 	}
@@ -936,28 +896,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 		this.useSnapshotCompression = useSnapshotCompression;
 	}
 
-	/**
-	 * @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config
-	 * to the task, and we have not supported task to fail on checkpoint error.
-	 * Please use CheckpointConfig.getTolerableCheckpointFailureNumber() to know the behavior on checkpoint errors.
-	 */
-	@Deprecated
-	@Internal
-	public boolean isFailTaskOnCheckpointError() {
-		return failTaskOnCheckpointError;
-	}
-
-	/**
-	 * @deprecated This method takes no effect since we would not forward the configuration from the checkpoint config
-	 * to the task, and we have not supported task to fail on checkpoint error.
-	 * Please use CheckpointConfig.setTolerableCheckpointFailureNumber(int) to determine the behavior on checkpoint errors.
-	 */
-	@Deprecated
-	@Internal
-	public void setFailTaskOnCheckpointError(boolean failTaskOnCheckpointError) {
-		this.failTaskOnCheckpointError = failTaskOnCheckpointError;
-	}
-
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof ExecutionConfig) {
@@ -1034,7 +972,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 			", taskCancellationIntervalMillis=" + taskCancellationIntervalMillis +
 			", taskCancellationTimeoutMillis=" + taskCancellationTimeoutMillis +
 			", useSnapshotCompression=" + useSnapshotCompression +
-			", failTaskOnCheckpointError=" + failTaskOnCheckpointError +
 			", defaultInputDependencyConstraint=" + defaultInputDependencyConstraint +
 			", globalJobParameters=" + globalJobParameters +
 			", registeredTypesWithKryoSerializers=" + registeredTypesWithKryoSerializers +
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
index 9e2a115..7b056d1 100644
--- a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
+++ b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/Elasticsearch5SinkExample.scala
@@ -42,7 +42,6 @@ object Elasticsearch5SinkExample {
       return
     }
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.disableSysoutLogging
     env.enableCheckpointing(5000)
 
     val source: DataStream[(String)] = env.generateSequence(0, 20 - 1)
diff --git a/flink-end-to-end-tests/test-scripts/test_cli.sh b/flink-end-to-end-tests/test-scripts/test_cli.sh
index 4b04890..bc4654b 100755
--- a/flink-end-to-end-tests/test-scripts/test_cli.sh
+++ b/flink-end-to-end-tests/test-scripts/test_cli.sh
@@ -85,7 +85,7 @@ if [ $EXIT_CODE == 0 ]; then
     printf "\n==============================================================================\n"
     printf "Test job launch with complex parameter set\n"
     printf "==============================================================================\n"
-    eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q \
+    eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 \
       -c org.apache.flink.examples.java.wordcount.WordCount \
       $FLINK_DIR/examples/batch/WordCount.jar \
       --input file:///$FLINK_DIR/README.txt \
diff --git a/flink-python/pyflink/common/execution_config.py b/flink-python/pyflink/common/execution_config.py
index 7c2a5e2..586ca1b 100644
--- a/flink-python/pyflink/common/execution_config.py
+++ b/flink-python/pyflink/common/execution_config.py
@@ -501,32 +501,6 @@ class ExecutionConfig(object):
         """
         return self._j_execution_config.isObjectReuseEnabled()
 
-    def enable_sysout_logging(self):
-        """
-        Enables the printing of progress update messages to stdout.
-
-        :return: This object.
-        """
-        self._j_execution_config = self._j_execution_config.enableSysoutLogging()
-        return self
-
-    def disable_sysout_logging(self):
-        """
-        Disables the printing of progress update messages to stdout.
-
-        :return: This object.
-        """
-        self._j_execution_config = self._j_execution_config.disableSysoutLogging()
-        return self
-
-    def is_sysout_logging_enabled(self):
-        """
-        Gets whether progress update messages should be printed to stdout.
-
-        :return: True, if progress update messages should be printed, false otherwise.
-        """
-        return self._j_execution_config.isSysoutLoggingEnabled()
-
     def get_global_job_parameters(self):
         """
         Gets current configuration dict.