You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/03/29 21:59:42 UTC
[flink] branch master updated: [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4c89959 [FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
4c89959 is described below
commit 4c8995917885e301ca11023fb5e4eb3d0b7a0c7e
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Sun Mar 27 11:29:12 2022 +0300
[FLINK-26092][table-runtime] Fix `JSON_OBJECTAGG` when emitting `NULL`
Previously, when the Json aggregation is taking place, and id
JsonOnNull.NULL is selected, which means that we still want to emit
a `null` JSON node, .i.e `{.... "myField" : null ... }` when no values
get accumulated, we used a null `StringData` object. When
`state.backend.changelog.enabled` is enabled, the contents of the map
accumulating the aggregated records, gets serialized leading to NPE,
since `null` is not supported by `StringDataSerilizer`.
To solve this, we instead create a StringData with an empty `byte[]`,
which denotes the null, and when the aggregation ends and we create
the final JSON result, we check for a `byte[]` of `length` `0` in
order to write the JSON `null` node.
---
.../BuiltInAggregateFunctionTestBase.java | 39 ++++++++++++++++------
.../functions/aggregate/JsonObjectAggFunction.java | 7 ++--
2 files changed, 33 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
index 158d2f0..961b69a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BuiltInAggregateFunctionTestBase.java
@@ -18,7 +18,8 @@
package org.apache.flink.table.planner.functions;
-import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
@@ -56,6 +57,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.flink.runtime.state.StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
+import static org.apache.flink.runtime.state.StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
import static org.apache.flink.table.test.TableAssertions.assertThat;
import static org.apache.flink.table.types.DataType.getFieldDataTypes;
import static org.assertj.core.api.Assertions.assertThat;
@@ -204,12 +207,16 @@ abstract class BuiltInAggregateFunctionTestBase {
return this;
}
- private Executable createTestItemExecutable(TestItem testItem) {
+ private Executable createTestItemExecutable(TestItem testItem, String stateBackend) {
return () -> {
+ Configuration conf = new Configuration();
+ conf.set(StateBackendOptions.STATE_BACKEND, stateBackend);
final TableEnvironment tEnv =
- TableEnvironment.create(EnvironmentSettings.inStreamingMode());
- // see https://issues.apache.org/jira/browse/FLINK-26092
- tEnv.getConfig().set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
+ TableEnvironment.create(
+ EnvironmentSettings.newInstance()
+ .inStreamingMode()
+ .withConfiguration(conf)
+ .build());
final Table sourceTable = asTable(tEnv, sourceRowType, sourceRows);
testItem.execute(tEnv, sourceTable);
@@ -217,12 +224,22 @@ abstract class BuiltInAggregateFunctionTestBase {
}
Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
- return testItems.stream()
- .map(
- testItem ->
- new BuiltInFunctionTestBase.TestCase(
- testItem.toString(),
- createTestItemExecutable(testItem)));
+ return Stream.concat(
+ testItems.stream()
+ .map(
+ testItem ->
+ new BuiltInFunctionTestBase.TestCase(
+ testItem.toString(),
+ createTestItemExecutable(
+ testItem, HASHMAP_STATE_BACKEND_NAME))),
+ testItems.stream()
+ .map(
+ testItem ->
+ new BuiltInFunctionTestBase.TestCase(
+ testItem.toString(),
+ createTestItemExecutable(
+ testItem,
+ ROCKSDB_STATE_BACKEND_NAME))));
}
@Override
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
index 8152f98..4be69c9 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java
@@ -57,6 +57,7 @@ public class JsonObjectAggFunction
extends BuiltInAggregateFunction<String, JsonObjectAggFunction.Accumulator> {
private static final long serialVersionUID = 1L;
+ private static final StringData NULL_STRING_DATA = StringData.fromBytes(new byte[] {});
private static final NullNode NULL_NODE = getNodeFactory().nullNode();
private final transient List<DataType> argumentTypes;
@@ -107,7 +108,9 @@ public class JsonObjectAggFunction
if (valueData == null) {
if (!skipNulls) {
- acc.map.put(keyData, null);
+ // We cannot use null for StringData here, since it's not supported by the
+ // StringDataSerializer, instead use a StringData with an empty byte[]
+ acc.map.put(keyData, NULL_STRING_DATA);
}
} else {
acc.map.put(keyData, valueData);
@@ -135,7 +138,7 @@ public class JsonObjectAggFunction
for (final StringData key : acc.map.keys()) {
final StringData value = acc.map.get(key);
final JsonNode valueNode =
- value == null
+ value.toBytes().length == 0
? NULL_NODE
: getNodeFactory().rawValueNode(new RawValue(value.toString()));