You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2024/01/30 01:56:52 UTC

(flink) 02/02: [FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification.

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 081051a2cacaddf6dfe613da061f15f28a015a41
Author: JunRuiLee <jr...@gmail.com>
AuthorDate: Mon Jan 29 17:35:19 2024 +0800

    [FLINK-34257][core] Update Flink YAML Parser to Support YAML 1.2 Specification.
    
    This closes #24213.
---
 flink-core/pom.xml                                 |   7 +
 .../flink/configuration/YamlParserUtils.java       | 167 +++++++++++----------
 .../flink/configuration/YamlParserUtilsTest.java   |  63 +++++++-
 flink-dist/src/main/resources/META-INF/NOTICE      |   1 +
 flink-python/dev/dev-requirements.txt              |   2 +-
 flink-python/pyflink/common/configuration.py       |   5 +-
 flink-python/pyflink/pyflink_gateway_server.py     |   5 +-
 flink-python/setup.py                              |   2 +-
 8 files changed, 159 insertions(+), 93 deletions(-)

diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 917ef53cfaf..618ac8c15dc 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -96,6 +96,13 @@ under the License.
 			<!-- managed version -->
 		</dependency>
 
+		<!-- YAML parser utilities -->
+		<dependency>
+			<groupId>org.snakeyaml</groupId>
+			<artifactId>snakeyaml-engine</artifactId>
+			<version>2.6</version>
+		</dependency>
+
 		<!-- standard utilities -->
 		<dependency>
 			<groupId>org.apache.commons</groupId>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java
index ae9280b3a86..a8b845043f6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/YamlParserUtils.java
@@ -20,20 +20,21 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.util.TimeUtils;
 
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.DumperOptions;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.LoaderOptions;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.constructor.Constructor;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.Mark;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.MarkedYAMLException;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.YAMLException;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Node;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.nodes.Tag;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Represent;
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.representer.Representer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.snakeyaml.engine.v2.api.Dump;
+import org.snakeyaml.engine.v2.api.DumpSettings;
+import org.snakeyaml.engine.v2.api.Load;
+import org.snakeyaml.engine.v2.api.LoadSettings;
+import org.snakeyaml.engine.v2.common.FlowStyle;
+import org.snakeyaml.engine.v2.exceptions.Mark;
+import org.snakeyaml.engine.v2.exceptions.MarkedYamlEngineException;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
+import org.snakeyaml.engine.v2.nodes.Node;
+import org.snakeyaml.engine.v2.nodes.ScalarNode;
+import org.snakeyaml.engine.v2.nodes.Tag;
+import org.snakeyaml.engine.v2.representer.StandardRepresenter;
+import org.snakeyaml.engine.v2.schema.CoreSchema;
 
 import javax.annotation.Nonnull;
 
@@ -47,6 +48,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * This class contains utility methods to load standard yaml file and convert object to standard
@@ -56,26 +58,30 @@ public class YamlParserUtils {
 
     private static final Logger LOG = LoggerFactory.getLogger(YamlParserUtils.class);
 
-    private static final Yaml yaml;
+    private static final DumpSettings blockerDumperSettings =
+            DumpSettings.builder()
+                    .setDefaultFlowStyle(FlowStyle.BLOCK)
+                    // Disable split long lines to avoid add unexpected line breaks
+                    .setSplitLines(false)
+                    .setSchema(new CoreSchema())
+                    .build();
 
-    private static final DumperOptions dumperOptions = new DumperOptions();
+    private static final DumpSettings flowDumperSettings =
+            DumpSettings.builder()
+                    .setDefaultFlowStyle(FlowStyle.FLOW)
+                    // Disable split long lines to avoid add unexpected line breaks
+                    .setSplitLines(false)
+                    .setSchema(new CoreSchema())
+                    .build();
 
-    private static final LoaderOptions loaderOptions = new LoaderOptions();
+    private static final Dump blockerDumper =
+            new Dump(blockerDumperSettings, new FlinkConfigRepresenter(blockerDumperSettings));
 
-    static {
-        // Make the dump output is in single line
-        dumperOptions.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW);
-        dumperOptions.setWidth(Integer.MAX_VALUE);
-        // The standard YAML do not allow duplicate keys.
-        loaderOptions.setAllowDuplicateKeys(false);
+    private static final Dump flowDumper =
+            new Dump(flowDumperSettings, new FlinkConfigRepresenter(flowDumperSettings));
 
-        yaml =
-                new Yaml(
-                        new Constructor(loaderOptions),
-                        new FlinkConfigRepresenter(dumperOptions),
-                        dumperOptions,
-                        loaderOptions);
-    }
+    private static final Load loader =
+            new Load(LoadSettings.builder().setSchema(new CoreSchema()).build());
 
     /**
      * Loads the contents of the given YAML file into a map.
@@ -84,21 +90,22 @@ public class YamlParserUtils {
      * @return a non-null map representing the YAML content. If the file is empty or only contains
      *     comments, an empty map is returned.
      * @throws FileNotFoundException if the YAML file is not found.
-     * @throws YAMLException if the file cannot be parsed.
+     * @throws YamlEngineException if the file cannot be parsed.
      * @throws IOException if an I/O error occurs while reading from the file stream.
      */
     public static synchronized @Nonnull Map<String, Object> loadYamlFile(File file)
             throws Exception {
         try (FileInputStream inputStream = new FileInputStream((file))) {
-            Map<String, Object> yamlResult = yaml.load(inputStream);
+            Map<String, Object> yamlResult =
+                    (Map<String, Object>) loader.loadFromInputStream(inputStream);
             return yamlResult == null ? new HashMap<>() : yamlResult;
         } catch (FileNotFoundException e) {
             LOG.error("Failed to find YAML file", e);
             throw e;
-        } catch (IOException | YAMLException e) {
-            if (e instanceof MarkedYAMLException) {
-                YAMLException exception =
-                        wrapExceptionToHiddenSensitiveData((MarkedYAMLException) e);
+        } catch (IOException | YamlEngineException e) {
+            if (e instanceof MarkedYamlEngineException) {
+                YamlEngineException exception =
+                        wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) e);
                 LOG.error("Failed to parse YAML configuration", exception);
                 throw exception;
             } else {
@@ -122,14 +129,14 @@ public class YamlParserUtils {
      */
     public static synchronized String toYAMLString(Object value) {
         try {
-            String output = yaml.dump(value);
+            String output = flowDumper.dumpToString(value);
             // remove the line break
-            String linebreak = dumperOptions.getLineBreak().getString();
+            String linebreak = flowDumperSettings.getBestLineBreak();
             if (output.endsWith(linebreak)) {
                 output = output.substring(0, output.length() - linebreak.length());
             }
             return output;
-        } catch (MarkedYAMLException exception) {
+        } catch (MarkedYamlEngineException exception) {
             throw wrapExceptionToHiddenSensitiveData(exception);
         }
     }
@@ -159,18 +166,18 @@ public class YamlParserUtils {
                 }
                 currentMap.put(keys[keys.length - 1], entry.getValue());
             }
-            String data = yaml.dumpAsMap(nestedMap);
-            String linebreak = dumperOptions.getLineBreak().getString();
+            String data = blockerDumper.dumpToString(nestedMap);
+            String linebreak = blockerDumperSettings.getBestLineBreak();
             return Arrays.asList(data.split(linebreak));
-        } catch (MarkedYAMLException exception) {
+        } catch (MarkedYamlEngineException exception) {
             throw wrapExceptionToHiddenSensitiveData(exception);
         }
     }
 
     public static synchronized <T> T convertToObject(String value, Class<T> type) {
         try {
-            return yaml.loadAs(value, type);
-        } catch (MarkedYAMLException exception) {
+            return type.cast(loader.loadFromString(value));
+        } catch (MarkedYamlEngineException exception) {
             throw wrapExceptionToHiddenSensitiveData(exception);
         }
     }
@@ -199,44 +206,48 @@ public class YamlParserUtils {
      * in 'reader', line 2, column 1
      * }</pre>
      *
-     * @param exception The MarkedYAMLException containing potentially sensitive data.
-     * @return A YAMLException with a message that has sensitive data hidden.
+     * @param exception The MarkedYamlEngineException containing potentially sensitive data.
+     * @return A YamlEngineException with a message that has sensitive data hidden.
      */
-    private static YAMLException wrapExceptionToHiddenSensitiveData(MarkedYAMLException exception) {
+    private static YamlEngineException wrapExceptionToHiddenSensitiveData(
+            MarkedYamlEngineException exception) {
         StringBuilder lines = new StringBuilder();
         String context = exception.getContext();
-        Mark contextMark = exception.getContextMark();
+        Optional<Mark> contextMark = exception.getContextMark();
+        Optional<Mark> problemMark = exception.getProblemMark();
         String problem = exception.getProblem();
-        Mark problemMark = exception.getProblemMark();
 
         if (context != null) {
             lines.append(context);
             lines.append("\n");
         }
-        if (contextMark != null
+
+        if (contextMark.isPresent()
                 && (problem == null
-                        || problemMark == null
-                        || contextMark.getName().equals(problemMark.getName())
-                        || (contextMark.getLine() != problemMark.getLine())
-                        || (contextMark.getColumn() != problemMark.getColumn()))) {
-            lines.append(hiddenSensitiveDataInMark(contextMark));
+                        || !problemMark.isPresent()
+                        || contextMark.get().getName().equals(problemMark.get().getName())
+                        || contextMark.get().getLine() != problemMark.get().getLine()
+                        || contextMark.get().getColumn() != problemMark.get().getColumn())) {
+            lines.append(hiddenSensitiveDataInMark(contextMark.get()));
             lines.append("\n");
         }
+
         if (problem != null) {
             lines.append(problem);
             lines.append("\n");
         }
-        if (problemMark != null) {
-            lines.append(hiddenSensitiveDataInMark(problemMark));
+
+        if (problemMark.isPresent()) {
+            lines.append(hiddenSensitiveDataInMark(problemMark.get()));
             lines.append("\n");
         }
 
         Throwable cause = exception.getCause();
-        if (cause instanceof MarkedYAMLException) {
-            cause = wrapExceptionToHiddenSensitiveData((MarkedYAMLException) cause);
+        if (cause instanceof MarkedYamlEngineException) {
+            cause = wrapExceptionToHiddenSensitiveData((MarkedYamlEngineException) cause);
         }
 
-        YAMLException yamlException = new YAMLException(lines.toString(), cause);
+        YamlEngineException yamlException = new YamlEngineException(lines.toString(), cause);
         yamlException.setStackTrace(exception.getStackTrace());
         return yamlException;
     }
@@ -254,37 +265,27 @@ public class YamlParserUtils {
                 + (mark.getColumn() + 1);
     }
 
-    private static class FlinkConfigRepresenter extends Representer {
-        public FlinkConfigRepresenter(DumperOptions options) {
-            super(options);
-            representers.put(Duration.class, new RepresentDuration());
-            representers.put(MemorySize.class, new RepresentMemorySize());
-            multiRepresenters.put(Enum.class, new RepresentEnum());
+    private static class FlinkConfigRepresenter extends StandardRepresenter {
+        public FlinkConfigRepresenter(DumpSettings dumpSettings) {
+            super(dumpSettings);
+            representers.put(Duration.class, this::representDuration);
+            representers.put(MemorySize.class, this::representMemorySize);
+            parentClassRepresenters.put(Enum.class, this::representEnum);
         }
 
-        private class RepresentDuration implements Represent {
-            @Override
-            public Node representData(Object data) {
-                Duration duration = (Duration) data;
-                String durationString = TimeUtils.formatWithHighestUnit(duration);
-                return representScalar(getTag(duration.getClass(), Tag.STR), durationString, null);
-            }
+        private Node representDuration(Object data) {
+            Duration duration = (Duration) data;
+            String durationString = TimeUtils.formatWithHighestUnit(duration);
+            return new ScalarNode(Tag.STR, durationString, settings.getDefaultScalarStyle());
         }
 
-        private class RepresentMemorySize implements Represent {
-            @Override
-            public Node representData(Object data) {
-                MemorySize memorySize = (MemorySize) data;
-                return representScalar(
-                        getTag(memorySize.getClass(), Tag.STR), memorySize.toString(), null);
-            }
+        private Node representMemorySize(Object data) {
+            MemorySize memorySize = (MemorySize) data;
+            return new ScalarNode(Tag.STR, memorySize.toString(), settings.getDefaultScalarStyle());
         }
 
-        private class RepresentEnum implements Represent {
-            @Override
-            public Node representData(Object data) {
-                return representScalar(getTag(data.getClass(), Tag.STR), data.toString(), null);
-            }
+        private Node representEnum(Object data) {
+            return new ScalarNode(Tag.STR, data.toString(), settings.getDefaultScalarStyle());
         }
     }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java
index f857914238f..df22563c474 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/YamlParserUtilsTest.java
@@ -20,11 +20,10 @@ package org.apache.flink.configuration;
 
 import org.apache.flink.util.ExceptionUtils;
 
-import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.error.YAMLException;
-
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.snakeyaml.engine.v2.exceptions.YamlEngineException;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -75,6 +74,62 @@ class YamlParserUtilsTest {
         assertThat(yamlData.get("key7")).isEqualTo("true");
     }
 
+    /**
+     * Tests to avoid potential unexpected behavior changes for FLINK configuration due to
+     * differences between YAML 1.2 and its predecessor YAML 1.1. This test case is based on the
+     * YAML Changes page <a href="https://yaml.org/spec/1.2.2/ext/changes">YAML Changes</a>.
+     */
+    @SuppressWarnings("unchecked")
+    @Test
+    void testYaml12Features() {
+        // In YAML 1.2, only true and false strings are parsed as booleans (including True and
+        // TRUE); y, yes, on, and their negative counterparts are parsed as strings.
+        String booleanRepresentation = "key1: Yes\n" + "key2: y\n" + "key3: on";
+        Map<String, String> expectedBooleanRepresentation = new HashMap<>();
+        expectedBooleanRepresentation.put(
+                "key1", "Yes"); // the value is expected to Boolean#True in YAML 1.1
+        expectedBooleanRepresentation.put(
+                "key2", "y"); // the value is expected to Boolean#True in YAML 1.1
+        expectedBooleanRepresentation.put(
+                "key3", "on"); // the value is expected to Boolean#True in YAML 1.1
+        assertThat(YamlParserUtils.convertToObject(booleanRepresentation, Map.class))
+                .containsAllEntriesOf(expectedBooleanRepresentation);
+
+        // In YAML 1.2, underlines '_' cannot be used within numerical values.
+        String underlineInNumber = "key1: 1_000";
+        assertThat(YamlParserUtils.convertToObject(underlineInNumber, Map.class))
+                .containsEntry(
+                        "key1",
+                        "1_000"); // In YAML 1.1, the expected value is number 1000 not a string.
+
+        // In YAML 1.2, Octal values need a 0o prefix; e.g. 010 is now parsed with the value 10
+        // rather than 8.
+        String octalNumber1 = "octal: 010";
+        assertThat(YamlParserUtils.convertToObject(octalNumber1, Map.class))
+                .containsEntry("octal", 10); // In YAML 1.1, the expected value is number 8.
+        String octalNumber2 = "octal: 0o10";
+        assertThat(YamlParserUtils.convertToObject(octalNumber2, Map.class))
+                .containsEntry("octal", 8);
+
+        // In YAML 1.2, the binary and sexagesimal integer formats have been dropped.
+        String binaryNumber = "binary: 0b101";
+        assertThat(YamlParserUtils.convertToObject(binaryNumber, Map.class))
+                .containsEntry(
+                        "binary",
+                        "0b101"); // In YAML 1.1, the expected value is number 5 not a string.
+        String sexagesimalNumber = "sexagesimal: 1:00";
+        assertThat(YamlParserUtils.convertToObject(sexagesimalNumber, Map.class))
+                .containsEntry(
+                        "sexagesimal",
+                        "1:00"); // In YAML 1.1, the expected value is number 60 not a string.
+
+        // In YAML 1.2, the !!pairs, !!omap, !!set, !!timestamp and !!binary types have been
+        // dropped.
+        String timestamp = "!!timestamp 2001-12-15T02:59:43.1Z";
+        assertThatThrownBy(() -> YamlParserUtils.convertToObject(timestamp, Object.class))
+                .isInstanceOf(YamlEngineException.class);
+    }
+
     @Test
     void testLoadEmptyYamlFile() throws Exception {
         File confFile = new File(tmpDir, "test.yaml");
@@ -92,7 +147,7 @@ class YamlParserUtilsTest {
             throw new RuntimeException(e);
         }
         assertThatThrownBy(() -> YamlParserUtils.loadYamlFile(confFile))
-                .isInstanceOf(YAMLException.class)
+                .isInstanceOf(YamlEngineException.class)
                 .satisfies(
                         e ->
                                 Assertions.assertThat(ExceptionUtils.stringifyException(e))
@@ -109,7 +164,7 @@ class YamlParserUtilsTest {
             throw new RuntimeException(e);
         }
         assertThatThrownBy(() -> YamlParserUtils.loadYamlFile(confFile))
-                .isInstanceOf(YAMLException.class)
+                .isInstanceOf(YamlEngineException.class)
                 .satisfies(
                         e ->
                                 Assertions.assertThat(ExceptionUtils.stringifyException(e))
diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE
index f5a1fc893c0..2eb8a611431 100644
--- a/flink-dist/src/main/resources/META-INF/NOTICE
+++ b/flink-dist/src/main/resources/META-INF/NOTICE
@@ -21,6 +21,7 @@ This project bundles the following dependencies under the Apache Software Licens
 - org.objenesis:objenesis:2.1
 - org.xerial.snappy:snappy-java:1.1.10.4
 - tools.profiler:async-profiler:2.9
+- org.snakeyaml:snakeyaml-engine:2.6
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details.
diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt
index f692a63f0f8..0e7d3fadb0c 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -32,4 +32,4 @@ pemja==0.4.1; platform_system != 'Windows'
 httplib2>=0.19.0
 protobuf>=3.19.0
 pytest~=7.0
-pyyaml>=6.0.1
+ruamel.yaml>=0.18.4
diff --git a/flink-python/pyflink/common/configuration.py b/flink-python/pyflink/common/configuration.py
index 35416b8fa47..779568da151 100644
--- a/flink-python/pyflink/common/configuration.py
+++ b/flink-python/pyflink/common/configuration.py
@@ -78,8 +78,9 @@ class Configuration:
     def parse_jars_value(value: str, jvm):
         is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
         if is_standard_yaml:
-            import yaml
-            jar_urls_list = yaml.safe_load(value)
+            from ruamel.yaml import YAML
+            yaml = YAML(typ='safe')
+            jar_urls_list = yaml.load(value)
             if isinstance(jar_urls_list, list):
                 return jar_urls_list
         return value.split(";")
diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py
index 19fd24bf7e5..0eae1e961ee 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -44,7 +44,8 @@ def on_windows():
 
 
 def read_from_config(key, default_value, flink_conf_directory):
-    import yaml
+    from ruamel.yaml import YAML
+    yaml = YAML(typ='safe')
     # try to find flink-conf.yaml file in flink_conf_directory
     flink_conf_file = os.path.join(flink_conf_directory, "flink-conf.yaml")
     if os.path.isfile(flink_conf_file):
@@ -68,7 +69,7 @@ def read_from_config(key, default_value, flink_conf_directory):
         if os.path.isfile(config_file):
             # If config.yaml exists, use YAML parser to read the value
             with open(os.path.realpath(config_file), "r") as f:
-                config = yaml.safe_load(f)
+                config = yaml.load(f)
                 flat_config = flatten_config(config)
                 return flat_config.get(key, default_value)
 
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 24637fe4c7d..e5e1e231b22 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -326,7 +326,7 @@ try:
                         'pyarrow>=5.0.0',
                         'pemja==0.4.1;platform_system != "Windows"',
                         'httplib2>=0.19.0',
-                        'pyyaml>=6.0.1',
+                        'ruamel.yaml>=0.18.4',
                         apache_flink_libraries_dependency]
 
     setup(