You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2024/01/30 01:56:52 UTC
(flink) 02/02: [FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification.
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 081051a2cacaddf6dfe613da061f15f28a015a41
Author: JunRuiLee <jr...@gmail.com>
AuthorDate: Mon Jan 29 17:35:19 2024 +0800
[FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification.
This closes #24213.
---
flink-core/pom.xml | 7 +
.../flink/configuration/YamlParserUtils.java | 167 +++++++++++----------
.../flink/configuration/YamlParserUtilsTest.java | 63 +++++++-
flink-dist/src/main/resources/META-INF/NOTICE | 1 +
flink-python/dev/dev-requirements.txt | 2 +-
flink-python/pyflink/common/configuration.py | 5 +-
flink-python/pyflink/pyflink_gateway_server.py | 5 +-
flink-python/setup.py | 2 +-
8 files changed, 159 insertions(+), 93 deletions(-)
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 917ef53cfaf..618ac8c15dc 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -96,6 +96,13 @@ under the License.
<!-- managed version -->
</dependency>
+ <!-- YAML parser utilities -->
+ <dependency>
+ <groupId>org.snakeyaml</groupId>
+ <artifactId>snakeyaml-engine</artifactId>
+ <version>2.6</version>
+ </dependency>
+
<!-- standard utilities -->
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java
index ae9280b3a86..a8b845043f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java
@@ -20,20 +20,21 @@ package org.apache.flink.configuration;
import org.apache.flink.util.TimeUtils;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.DumperOptions;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.LoaderOptions;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.constructor.Constructor;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.Mark;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.MarkedYAMLException;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.YAMLException;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Node;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Tag;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Represent;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Representer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.snakeyaml.engine.v2.api.Dump;
+import org.snakeyaml.engine.v2.api.DumpSettings;
+import org.snakeyaml.engine.v2.api.Load;
+import org.snakeyaml.engine.v2.api.LoadSettings;
+import org.snakeyaml.engine.v2.common.FlowStyle;
+import org.snakeyaml.engine.v2.exceptions.Mark;
+import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
+import org.snakeyaml.engine.v2.nodes.Node;
+import org.snakeyaml.engine.v2.nodes.ScalarNode;
+import org.snakeyaml.engine.v2.nodes.Tag;
+import org.snakeyaml.engine.v2.representer.StandardRepresenter;
+import org.snakeyaml.engine.v2.schema.CoreSchema;
import javax.annotation.Nonnull;
@@ -47,6 +48,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* This class contains utility methods to load standard yaml file and convert object to standard
@@ -56,26 +58,30 @@ public class YamlParserUtils {
private static final Logger LOG = LoggerFactory.getLogger(YamlParserUtils.class);
- private static final Yaml yaml;
+ private static final DumpSettings blockerDumperSettings =
+ DumpSettings.builder()
+ .setDefaultFlowStyle(FlowStyle.BLOCK)
+ // Disable split long lines to avoid add unexpected line breaks
+ .setSplitLines(false)
+ .setSchema(new CoreSchema())
+ .build();
- private static final DumperOptions dumperOptions = new DumperOptions();
+ private static final DumpSettings flowDumperSettings =
+ DumpSettings.builder()
+ .setDefaultFlowStyle(FlowStyle.FLOW)
+ // Disable split long lines to avoid add unexpected line breaks
+ .setSplitLines(false)
+ .setSchema(new CoreSchema())
+ .build();
- private static final LoaderOptions loaderOptions = new LoaderOptions();
+ private static final Dump blockerDumper =
+ new Dump(blockerDumperSettings, new FlinkConfigRepresenter(blockerDumperSettings));
- static {
- // Make the dump output is in single line
- dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
- dumperOptions.setWidth(Integer.MAX_VALUE);
- // The standard YAML do not allow duplicate keys.
- loaderOptions.setAllowDuplicateKeys(false);
+ private static final Dump flowDumper =
+ new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings));
- yaml =
- new Yaml(
- new Constructor(loaderOptions),
- new FlinkConfigRepresenter(dumperOptions),
- dumperOptions,
- loaderOptions);
- }
+ private static final Load loader =
+ new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
/**
* Loads the contents of the given YAML file into a map.
@@ -84,21 +90,22 @@ public class YamlParserUtils {
* @return a non-null map representing the YAML content. If the file is empty or only contains
* comments, an empty map is returned.
* @throws FileNotFoundException if the YAML file is not found.
- * @throws YAMLException if the file cannot be parsed.
+ * @throws YamlEngineException if the file cannot be parsed.
* @throws IOException if an I/O error occurs while reading from the file stream.
*/
public static synchronized @Nonnull Map<String, Object> loadYamlFile(File file)
throws Exception {
try (FileInputStream inputStream = new FileInputStream((file))) {
- Map<String, Object> yamlResult = yaml.load(inputStream);
+ Map<String, Object> yamlResult =
+ (Map<String, Object>) loader.loadFromInputStream(inputStream);
return yamlResult == null ? new HashMap<>() : yamlResult;
} catch (FileNotFoundException e) {
LOG.error("Failed to find YAML file", e);
throw e;
- } catch (IOException | YAMLException e) {
- if (e instanceof MarkedYAMLException) {
- YAMLException exception =
- wrapExceptionToHiddenSensitiveData((MarkedYAMLException) e);
+ } catch (IOException | YamlEngineException e) {
+ if (e instanceof MarkedYamlEngineException) {
+ YamlEngineException exception =
+ wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
LOG.error("Failed to parse YAML configuration", exception);
throw exception;
} else {
@@ -122,14 +129,14 @@ public class YamlParserUtils {
*/
public static synchronized String toYAMLString(Object value) {
try {
- String output = yaml.dump(value);
+ String output = flowDumper.dumpToString(value);
// remove the line break
- String linebreak = dumperOptions.getLineBreak().getString();
+ String linebreak = flowDumperSettings.getBestLineBreak();
if (output.endsWith(linebreak)) {
output = output.substring(0, output.length() - linebreak.length());
}
return output;
- } catch (MarkedYAMLException exception) {
+ } catch (MarkedYamlEngineException exception) {
throw wrapExceptionToHiddenSensitiveData(exception);
}
}
@@ -159,18 +166,18 @@ public class YamlParserUtils {
}
currentMap.put(keys[keys.length - 1], entry.getValue());
}
- String data = yaml.dumpAsMap(nestedMap);
- String linebreak = dumperOptions.getLineBreak().getString();
+ String data = blockerDumper.dumpToString(nestedMap);
+ String linebreak = blockerDumperSettings.getBestLineBreak();
return Arrays.asList(data.split(linebreak));
- } catch (MarkedYAMLException exception) {
+ } catch (MarkedYamlEngineException exception) {
throw wrapExceptionToHiddenSensitiveData(exception);
}
}
public static synchronized <T> T convertToObject(String value, Class<T> type) {
try {
- return yaml.loadAs(value, type);
- } catch (MarkedYAMLException exception) {
+ return type.cast(loader.loadFromString(value));
+ } catch (MarkedYamlEngineException exception) {
throw wrapExceptionToHiddenSensitiveData(exception);
}
}
@@ -199,44 +206,48 @@ public class YamlParserUtils {
* in 'reader', line 2, column 1
* }</pre>
*
- * @param exception The MarkedYAMLException containing potentially sensitive data.
- * @return A YAMLException with a message that has sensitive data hidden.
+ * @param exception The MarkedYamlEngineException containing potentially sensitive data.
+ * @return A YamlEngineException with a message that has sensitive data hidden.
*/
- private static YAMLException wrapExceptionToHiddenSensitiveData(MarkedYAMLException exception) {
+ private static YamlEngineException wrapExceptionToHiddenSensitiveData(
+ MarkedYamlEngineException exception) {
StringBuilder lines = new StringBuilder();
String context = exception.getContext();
- Mark contextMark = exception.getContextMark();
+ Optional<Mark> contextMark = exception.getContextMark();
+ Optional<Mark> problemMark = exception.getProblemMark();
String problem = exception.getProblem();
- Mark problemMark = exception.getProblemMark();
if (context != null) {
lines.append(context);
lines.append("\n");
}
- if (contextMark != null
+
+ if (contextMark.isPresent()
&& (problem == null
- || problemMark == null
- || contextMark.getName().equals(problemMark.getName())
- || (contextMark.getLine() != problemMark.getLine())
- || (contextMark.getColumn() != problemMark.getColumn()))) {
- lines.append(hiddenSensitiveDataInMark(contextMark));
+ || !problemMark.isPresent()
+ || contextMark.get().getName().equals(problemMark.get().getName())
+ || contextMark.get().getLine() != problemMark.get().getLine()
+ || contextMark.get().getColumn() != problemMark.get().getColumn())) {
+ lines.append(hiddenSensitiveDataInMark(contextMark.get()));
lines.append("\n");
}
+
if (problem != null) {
lines.append(problem);
lines.append("\n");
}
- if (problemMark != null) {
- lines.append(hiddenSensitiveDataInMark(problemMark));
+
+ if (problemMark.isPresent()) {
+ lines.append(hiddenSensitiveDataInMark(problemMark.get()));
lines.append("\n");
}
Throwable cause = exception.getCause();
- if (cause instanceof MarkedYAMLException) {
- cause = wrapExceptionToHiddenSensitiveData((MarkedYAMLException) cause);
+ if (cause instanceof MarkedYamlEngineException) {
+ cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) cause);
}
- YAMLException yamlException = new YAMLException(lines.toString(), cause);
+ YamlEngineException yamlException = new YamlEngineException(lines.toString(), cause);
yamlException.setStackTrace(exception.getStackTrace());
return yamlException;
}
@@ -254,37 +265,27 @@ public class YamlParserUtils {
+ (mark.getColumn() + 1);
}
- private static class FlinkConfigRepresenter extends Representer {
- public FlinkConfigRepresenter(DumperOptions options) {
- super(options);
- representers.put(Duration.class, new RepresentDuration());
- representers.put(MemorySize.class, new RepresentMemorySize());
- multiRepresenters.put(Enum.class, new RepresentEnum());
+ private static class FlinkConfigRepresenter extends StandardRepresenter {
+ public FlinkConfigRepresenter(DumpSettings dumpSettings) {
+ super(dumpSettings);
+ representers.put(Duration.class, this::representDuration);
+ representers.put(MemorySize.class, this::representMemorySize);
+ parentClassRepresenters.put(Enum.class, this::representEnum);
}
- private class RepresentDuration implements Represent {
- @Override
- public Node representData(Object data) {
- Duration duration = (Duration) data;
- String durationString = TimeUtils.formatWithHighestUnit(duration);
- return representScalar(getTag(duration.getClass(), Tag.STR), durationString, null);
- }
+ private Node representDuration(Object data) {
+ Duration duration = (Duration) data;
+ String durationString = TimeUtils.formatWithHighestUnit(duration);
+ return new ScalarNode(Tag.STR, durationString, settings.getDefaultScalarStyle());
}
- private class RepresentMemorySize implements Represent {
- @Override
- public Node representData(Object data) {
- MemorySize memorySize = (MemorySize) data;
- return representScalar(
- getTag(memorySize.getClass(), Tag.STR), memorySize.toString(), null);
- }
+ private Node representMemorySize(Object data) {
+ MemorySize memorySize = (MemorySize) data;
+ return new ScalarNode(Tag.STR, memorySize.toString(), settings.getDefaultScalarStyle());
}
- private class RepresentEnum implements Represent {
- @Override
- public Node representData(Object data) {
- return representScalar(getTag(data.getClass(), Tag.STR), data.toString(), null);
- }
+ private Node representEnum(Object data) {
+ return new ScalarNode(Tag.STR, data.toString(), settings.getDefaultScalarStyle());
}
}
}
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java
index f857914238f..df22563c474 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java
@@ -20,11 +20,10 @@ package org.apache.flink.configuration;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.YAMLException;
-
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
import java.io.File;
import java.io.FileNotFoundException;
@@ -75,6 +74,62 @@ class YamlParserUtilsTest {
assertThat(yamlData.get("key7")).isEqualTo("true");
}
+ /**
+ * Tests to avoid potential unexpected behavior changes for FLINK configuration due to
+ * differences between YAML 1.2 and its predecessor YAML 1.1. This test case is based on the
+ * YAML Changes page <a href="https://yaml.org/spec/1.2.2/ext/changes">YAML Changes</a>.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ void testYaml12Features() {
+ // In YAML 1.2, only true and false strings are parsed as booleans (including True and
+ // TRUE); y, yes, on, and their negative counterparts are parsed as strings.
+ String booleanRepresentation = "key1: Yes\n" + "key2: y\n" + "key3: on";
+ Map<String, String> expectedBooleanRepresentation = new HashMap<>();
+ expectedBooleanRepresentation.put(
+ "key1", "Yes"); // the value is expected to Boolean#True in YAML 1.1
+ expectedBooleanRepresentation.put(
+ "key2", "y"); // the value is expected to Boolean#True in YAML 1.1
+ expectedBooleanRepresentation.put(
+ "key3", "on"); // the value is expected to Boolean#True in YAML 1.1
+ assertThat(YamlParserUtils.convertToObject(booleanRepresentation, Map.class))
+ .containsAllEntriesOf(expectedBooleanRepresentation);
+
+ // In YAML 1.2, underlines '_' cannot be used within numerical values.
+ String underlineInNumber = "key1: 1_000";
+ assertThat(YamlParserUtils.convertToObject(underlineInNumber, Map.class))
+ .containsEntry(
+ "key1",
+ "1_000"); // In YAML 1.1, the expected value is number 1000 not a string.
+
+ // In YAML 1.2, Octal values need a 0o prefix; e.g. 010 is now parsed with the value 10
+ // rather than 8.
+ String octalNumber1 = "octal: 010";
+ assertThat(YamlParserUtils.convertToObject(octalNumber1, Map.class))
+ .containsEntry("octal", 10); // In YAML 1.1, the expected value is number 8.
+ String octalNumber2 = "octal: 0o10";
+ assertThat(YamlParserUtils.convertToObject(octalNumber2, Map.class))
+ .containsEntry("octal", 8);
+
+ // In YAML 1.2, the binary and sexagesimal integer formats have been dropped.
+ String binaryNumber = "binary: 0b101";
+ assertThat(YamlParserUtils.convertToObject(binaryNumber, Map.class))
+ .containsEntry(
+ "binary",
+ "0b101"); // In YAML 1.1, the expected value is number 5 not a string.
+ String sexagesimalNumber = "sexagesimal: 1:00";
+ assertThat(YamlParserUtils.convertToObject(sexagesimalNumber, Map.class))
+ .containsEntry(
+ "sexagesimal",
+ "1:00"); // In YAML 1.1, the expected value is number 60 not a string.
+
+ // In YAML 1.2, the !!pairs, !!omap, !!set, !!timestamp and !!binary types have been
+ // dropped.
+ String timestamp = "!!timestamp 2001-12-15T02:59:43.1Z";
+ assertThatThrownBy(() -> YamlParserUtils.convertToObject(timestamp, Object.class))
+ .isInstanceOf(YamlEngineException.class);
+ }
+
@Test
void testLoadEmptyYamlFile() throws Exception {
File confFile = new File(tmpDir, "test.yaml");
@@ -92,7 +147,7 @@ class YamlParserUtilsTest {
throw new RuntimeException(e);
}
assertThatThrownBy(() -> YamlParserUtils.loadYamlFile(confFile))
- .isInstanceOf(YAMLException.class)
+ .isInstanceOf(YamlEngineException.class)
.satisfies(
e ->
Assertions.assertThat(ExceptionUtils.stringifyException(e))
@@ -109,7 +164,7 @@ class YamlParserUtilsTest {
throw new RuntimeException(e);
}
assertThatThrownBy(() -> YamlParserUtils.loadYamlFile(confFile))
- .isInstanceOf(YAMLException.class)
+ .isInstanceOf(YamlEngineException.class)
.satisfies(
e ->
Assertions.assertThat(ExceptionUtils.stringifyException(e))
diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE
index f5a1fc893c0..2eb8a611431 100644
--- a/flink-dist/src/main/resources/META-INF/NOTICE
+++ b/flink-dist/src/main/resources/META-INF/NOTICE
@@ -21,6 +21,7 @@ This project bundles the following dependencies under the Apache Software Licens
- org.objenesis:objenesis:2.1
- org.xerial.snappy:snappy-java:1.1.10.4
- tools.profiler:async-profiler:2.9
+- org.snakeyaml:snakeyaml-engine:2.6
This project bundles the following dependencies under the BSD license.
See bundled license files for details.
diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt
index f692a63f0f8..0e7d3fadb0c 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -32,4 +32,4 @@ pemja==0.4.1; platform_system != 'Windows'
httplib2>=0.19.0
protobuf>=3.19.0
pytest~=7.0
-pyyaml>=6.0.1
+ruamel.yaml>=0.18.4
diff --git a/flink-python/pyflink/common/configuration.py b/flink-python/pyflink/common/configuration.py
index 35416b8fa47..779568da151 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -78,8 +78,9 @@ class Configuration:
def parse_jars_value(value: str, jvm):
is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
if is_standard_yaml:
- import yaml
- jar_urls_list = yaml.safe_load(value)
+ from ruamel.yaml import YAML
+ yaml = YAML(typ='safe')
+ jar_urls_list = yaml.load(value)
if isinstance(jar_urls_list, list):
return jar_urls_list
return value.split(";")
diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py
index 19fd24bf7e5..0eae1e961ee 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -44,7 +44,8 @@ def on_windows():
def read_from_config(key, default_value, flink_conf_directory):
- import yaml
+ from ruamel.yaml import YAML
+ yaml = YAML(typ='safe')
# try to find flink-conf.yaml file in flink_conf_directory
flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml")
if os.path.isfile(flink_conf_file):
@@ -68,7 +69,7 @@ def read_from_config(key, default_value, flink_conf_directory):
if os.path.isfile(config_file):
# If config.yaml exists, use YAML parser to read the value
with open(os.path.realpath(config_file), "r") as f:
- config = yaml.safe_load(f)
+ config = yaml.load(f)
flat_config = flatten_config(config)
return flat_config.get(key, default_value)
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 24637fe4c7d..e5e1e231b22 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -326,7 +326,7 @@ try:
'pyarrow>=5.0.0',
'pemja==0.4.1;platform_system != "Windows"',
'httplib2>=0.19.0',
- 'pyyaml>=6.0.1',
+ 'ruamel.yaml>=0.18.4',
apache_flink_libraries_dependency]
setup(