You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/29 21:59:42 UTC

[flink] branch master updated: [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c89959  [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
4c89959 is described below

commit 4c8995917885e301ca11023fb5e4eb3d0b7a0c7e
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Sun Mar 27 11:29:12 2022 +0300

    [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
    
    Previously, when the Json aggregation is taking place, and id
    JsonOnNull.NULL is selected, which means that we still want to emit
    a `null` JSON node, .i.e `{.... "myField" : null ... }` when no values
    get accumulated, we used a null `StringData` object. When
    `state.backend.changelog.enabled` is enabled, the contents of the map
    accumulating the aggregated records, gets serialized leading to NPE,
    since `null` is not supported by `StringDataSerilizer`.
    
    To solve this, we instead create a StringData with an empty `byte[]`,
    which denotes the null, and when the aggregation ends and we create
    the final JSON result, we check for a `byte[]` of `length` `0` in
    order to write the JSON `null` node.
---
 .../BuiltInAggregateFunctionTestBase.java          | 39 ++++++++++++++++------
 .../functions/aggregate/JsonObjectAggFunction.java |  7 ++--
 2 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
index 158d2f0..961b69a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.table.planner.functions;
 
-import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
@@ -56,6 +57,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.flink.runtime.state.StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
+import static org.apache.flink.runtime.state.StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
 import static org.apache.flink.table.test.TableAssertions.assertThat;
 import static org.apache.flink.table.types.DataType.getFieldDataTypes;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -204,12 +207,16 @@ abstract class BuiltInAggregateFunctionTestBase {
             return this;
         }
 
-        private Executable createTestItemExecutable(TestItem testItem) {
+        private Executable createTestItemExecutable(TestItem testItem, String stateBackend) {
             return () -> {
+                Configuration conf = new Configuration();
+                conf.set(StateBackendOptions.STATE_BACKEND, stateBackend);
                 final TableEnvironment tEnv =
-                        TableEnvironment.create(EnvironmentSettings.inStreamingMode());
-                // see https://issues.apache.org/jira/browse/FLINK-26092
-                tEnv.getConfig().set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
+                        TableEnvironment.create(
+                                EnvironmentSettings.newInstance()
+                                        .inStreamingMode()
+                                        .withConfiguration(conf)
+                                        .build());
                 final Table sourceTable = asTable(tEnv, sourceRowType, sourceRows);
 
                 testItem.execute(tEnv, sourceTable);
@@ -217,12 +224,22 @@ abstract class BuiltInAggregateFunctionTestBase {
         }
 
         Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
-            return testItems.stream()
-                    .map(
-                            testItem ->
-                                    new BuiltInFunctionTestBase.TestCase(
-                                            testItem.toString(),
-                                            createTestItemExecutable(testItem)));
+            return Stream.concat(
+                    testItems.stream()
+                            .map(
+                                    testItem ->
+                                            new BuiltInFunctionTestBase.TestCase(
+                                                    testItem.toString(),
+                                                    createTestItemExecutable(
+                                                            testItem, HASHMAP_STATE_BACKEND_NAME))),
+                    testItems.stream()
+                            .map(
+                                    testItem ->
+                                            new BuiltInFunctionTestBase.TestCase(
+                                                    testItem.toString(),
+                                                    createTestItemExecutable(
+                                                            testItem,
+                                                            ROCKSDB_STATE_BACKEND_NAME))));
         }
 
         @Override
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
index 8152f98..4be69c9 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
@@ -57,6 +57,7 @@ public class JsonObjectAggFunction
         extends BuiltInAggregateFunction<String, JsonObjectAggFunction.Accumulator> {
 
     private static final long serialVersionUID = 1L;
+    private static final StringData NULL_STRING_DATA = StringData.fromBytes(new byte[] {});
     private static final NullNode NULL_NODE = getNodeFactory().nullNode();
 
     private final transient List<DataType> argumentTypes;
@@ -107,7 +108,9 @@ public class JsonObjectAggFunction
 
         if (valueData == null) {
             if (!skipNulls) {
-                acc.map.put(keyData, null);
+                // We cannot use null for StringData here, since it's not supported by the
+                // StringDataSerializer, instead use a StringData with an empty byte[]
+                acc.map.put(keyData, NULL_STRING_DATA);
             }
         } else {
             acc.map.put(keyData, valueData);
@@ -135,7 +138,7 @@ public class JsonObjectAggFunction
             for (final StringData key : acc.map.keys()) {
                 final StringData value = acc.map.get(key);
                 final JsonNode valueNode =
-                        value == null
+                        value.toBytes().length == 0
                                 ? NULL_NODE
                                 : getNodeFactory().rawValueNode(new RawValue(value.toString()));