You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/25 18:37:35 UTC

[GitHub] [flink] Airblader opened a new pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Airblader opened a new pull request #17562:
URL: https://github.com/apache/flink/pull/17562


   ## What is the purpose of the change
   
   This introduces `JSON_ARRAYAGG` akin to `JSON_OBJECTAGG` from #17549.
   
   Note that this PR is based off of that PR and thus needs to be rebased once #17549 is merged. Keeping it in draft until then.
   
   supersedes #11370
   
   ## Verifying this change
   
   * `JsonAggregationFunctionsITCase`
   * `WrapJsonAggFunctionArgumentsRuleTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs + JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "953528884",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 3a3daa5d043d9d67fbf13a432ac12fcaf2593fde Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Airblader commented on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
Airblader commented on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-953528884


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #17562:
URL: https://github.com/apache/flink/pull/17562#discussion_r736487847



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue;
+
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson;
+
+/**
+ * Implementation for {@link BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link
+ * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}.
+ *
+ * <p>Note that this function only ever receives strings to accumulate because {@link
+ * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link
+ * BuiltInFunctionDefinitions#JSON_STRING}.
+ */
+@Internal
+public class JsonArrayAggFunction
+        extends BuiltInAggregateFunction<String, JsonArrayAggFunction.Accumulator> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Marker that represents a {@code null} since {@link ListView} does not allow {@code null}s.
+     *
+     * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the fact that this function
+     * already only receives JSON strings, this value cannot be created by the user and is thus safe
+     * to use.
+     */
+    private static final StringData NULL_STR = StringData.fromString("null");
+
+    private final transient List<DataType> argumentTypes;
+    private final SqlJsonConstructorNullClause onNull;
+
+    public JsonArrayAggFunction(LogicalType[] argumentTypes, SqlJsonConstructorNullClause onNull) {

Review comment:
       let's make this a boolean flag to not have Calcite deps in runtime code

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue;
+
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson;
+
+/**
+ * Implementation for {@link BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link
+ * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}.
+ *
+ * <p>Note that this function only ever receives strings to accumulate because {@link
+ * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link
+ * BuiltInFunctionDefinitions#JSON_STRING}.
+ */
+@Internal
+public class JsonArrayAggFunction
+        extends BuiltInAggregateFunction<String, JsonArrayAggFunction.Accumulator> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Marker that represents a {@code null} since {@link ListView} does not allow {@code null}s.
+     *
+     * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the fact that this function
+     * already only receives JSON strings, this value cannot be created by the user and is thus safe
+     * to use.
+     */
+    private static final StringData NULL_STR = StringData.fromString("null");
+
+    private final transient List<DataType> argumentTypes;
+    private final SqlJsonConstructorNullClause onNull;
+
+    public JsonArrayAggFunction(LogicalType[] argumentTypes, SqlJsonConstructorNullClause onNull) {
+        this.argumentTypes =
+                Arrays.stream(argumentTypes)
+                        .map(DataTypeUtils::toInternalDataType)
+                        .collect(Collectors.toList());
+
+        this.onNull = onNull;
+    }
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return argumentTypes;
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                Accumulator.class,
+                DataTypes.FIELD("list", ListView.newListViewDataType(DataTypes.STRING())));
+    }
+
+    @Override
+    public Accumulator createAccumulator() {
+        return new Accumulator();
+    }
+
+    public void resetAccumulator(Accumulator acc) {
+        acc.list.clear();
+    }
+
+    public void accumulate(Accumulator acc, StringData itemData) throws Exception {
+        if (itemData == null) {
+            switch (onNull) {
+                case NULL_ON_NULL:
+                    acc.list.add(NULL_STR);
+                    break;
+                case ABSENT_ON_NULL:
+                    break;
+                default:
+                    throw new TableException(
+                            String.format("Unsupported ON NULL behavior: %s", onNull));
+            }
+        } else {
+            acc.list.add(itemData);
+        }
+    }
+
+    public void retract(Accumulator acc, StringData itemData) throws Exception {
+        if (itemData == null) {
+            acc.list.remove(NULL_STR);
+        } else {
+            acc.list.remove(itemData);
+        }
+    }
+
+    public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
+        for (final Accumulator other : others) {
+            acc.list.addAll(other.list.getList());
+        }
+    }
+
+    @Override
+    public String getValue(Accumulator acc) {
+        final ArrayNode rootNode = createArrayNode();
+        try {
+            for (final StringData item : acc.list.get()) {
+                final JsonNode itemNode =
+                        getNodeFactory().rawValueNode(new RawValue(item.toString()));

Review comment:
       can we also somehow sort the list? otherwise the `merge` might behave non-deterministic

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue;
+
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson;
+
+/**
+ * Implementation for {@link BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link
+ * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}.
+ *
+ * <p>Note that this function only ever receives strings to accumulate because {@link
+ * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link
+ * BuiltInFunctionDefinitions#JSON_STRING}.
+ */
+@Internal
+public class JsonArrayAggFunction
+        extends BuiltInAggregateFunction<String, JsonArrayAggFunction.Accumulator> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Marker that represents a {@code null} since {@link ListView} does not allow {@code null}s.
+     *
+     * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the fact that this function
+     * already only receives JSON strings, this value cannot be created by the user and is thus safe
+     * to use.
+     */
+    private static final StringData NULL_STR = StringData.fromString("null");
+
+    private final transient List<DataType> argumentTypes;
+    private final SqlJsonConstructorNullClause onNull;
+
+    public JsonArrayAggFunction(LogicalType[] argumentTypes, SqlJsonConstructorNullClause onNull) {
+        this.argumentTypes =
+                Arrays.stream(argumentTypes)
+                        .map(DataTypeUtils::toInternalDataType)
+                        .collect(Collectors.toList());
+
+        this.onNull = onNull;
+    }
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return argumentTypes;
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                Accumulator.class,
+                DataTypes.FIELD("list", ListView.newListViewDataType(DataTypes.STRING())));

Review comment:
       `STRING().bridgedTo(StringData.class)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a90ad8e577b79b3f68fbed12a5824f6b822c129 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434) 
   * dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a90ad8e577b79b3f68fbed12a5824f6b822c129 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434) 
   * dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Airblader commented on a change in pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
Airblader commented on a change in pull request #17562:
URL: https://github.com/apache/flink/pull/17562#discussion_r736516990



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue;
+
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson;
+
+/**
+ * Implementation for {@link BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link
+ * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}.
+ *
+ * <p>Note that this function only ever receives strings to accumulate because {@link
+ * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link
+ * BuiltInFunctionDefinitions#JSON_STRING}.
+ */
+@Internal
+public class JsonArrayAggFunction
+        extends BuiltInAggregateFunction<String, JsonArrayAggFunction.Accumulator> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Marker that represents a {@code null} since {@link ListView} does not allow {@code null}s.
+     *
+     * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the fact that this function
+     * already only receives JSON strings, this value cannot be created by the user and is thus safe
+     * to use.
+     */
+    private static final StringData NULL_STR = StringData.fromString("null");
+
+    private final transient List<DataType> argumentTypes;
+    private final SqlJsonConstructorNullClause onNull;
+
+    public JsonArrayAggFunction(LogicalType[] argumentTypes, SqlJsonConstructorNullClause onNull) {
+        this.argumentTypes =
+                Arrays.stream(argumentTypes)
+                        .map(DataTypeUtils::toInternalDataType)
+                        .collect(Collectors.toList());
+
+        this.onNull = onNull;
+    }
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return argumentTypes;
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                Accumulator.class,
+                DataTypes.FIELD("list", ListView.newListViewDataType(DataTypes.STRING())));
+    }
+
+    @Override
+    public Accumulator createAccumulator() {
+        return new Accumulator();
+    }
+
+    public void resetAccumulator(Accumulator acc) {
+        acc.list.clear();
+    }
+
+    public void accumulate(Accumulator acc, StringData itemData) throws Exception {
+        if (itemData == null) {
+            switch (onNull) {
+                case NULL_ON_NULL:
+                    acc.list.add(NULL_STR);
+                    break;
+                case ABSENT_ON_NULL:
+                    break;
+                default:
+                    throw new TableException(
+                            String.format("Unsupported ON NULL behavior: %s", onNull));
+            }
+        } else {
+            acc.list.add(itemData);
+        }
+    }
+
+    public void retract(Accumulator acc, StringData itemData) throws Exception {
+        if (itemData == null) {
+            acc.list.remove(NULL_STR);
+        } else {
+            acc.list.remove(itemData);
+        }
+    }
+
+    public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
+        for (final Accumulator other : others) {
+            acc.list.addAll(other.list.getList());
+        }
+    }
+
+    @Override
+    public String getValue(Accumulator acc) {
+        final ArrayNode rootNode = createArrayNode();
+        try {
+            for (final StringData item : acc.list.get()) {
+                final JsonNode itemNode =
+                        getNodeFactory().rawValueNode(new RawValue(item.toString()));

Review comment:
       Hm, this is actually a problem. In JSON_OBJECTAGG we sorted the keys, and that's fine, but arrays are order-sensitive. Maybe we shouldn't support `merge` at all? We'd need `WITHIN GROUP` here, but I'm guessing Calcite doesn't support that keyword there yet (will have to check, though).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "953528884",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 3a3daa5d043d9d67fbf13a432ac12fcaf2593fde Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a90ad8e577b79b3f68fbed12a5824f6b822c129 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 3a3daa5d043d9d67fbf13a432ac12fcaf2593fde Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
twalthr commented on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-953148532


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537",
       "triggerID" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 3a3daa5d043d9d67fbf13a432ac12fcaf2593fde Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25537) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Airblader commented on a change in pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
Airblader commented on a change in pull request #17562:
URL: https://github.com/apache/flink/pull/17562#discussion_r735863664



##########
File path: docs/data/sql_functions.yml
##########
@@ -644,6 +644,27 @@ json:
       SELECT JSON_EXISTS('{"a": true}',
         'strict $.b' FALSE ON ERROR);
       ```
+  - sql: JSON_STRING(value)

Review comment:
       I moved this a bit up because it makes more sense here than in between object/array functions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7a90ad8e577b79b3f68fbed12a5824f6b822c129 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Airblader commented on a change in pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
Airblader commented on a change in pull request #17562:
URL: https://github.com/apache/flink/pull/17562#discussion_r737159905



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/JsonArrayAggFunction.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.planner.plan.rules.logical.WrapJsonAggFunctionArgumentsRule;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue;
+
+import org.apache.calcite.sql.SqlJsonConstructorNullClause;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.createArrayNode;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.getNodeFactory;
+import static org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson;
+
+/**
+ * Implementation for {@link BuiltInFunctionDefinitions#JSON_ARRAYAGG_ABSENT_ON_NULL} / {@link
+ * BuiltInFunctionDefinitions#JSON_ARRAYAGG_NULL_ON_NULL}.
+ *
+ * <p>Note that this function only ever receives strings to accumulate because {@link
+ * WrapJsonAggFunctionArgumentsRule} wraps arguments into {@link
+ * BuiltInFunctionDefinitions#JSON_STRING}.
+ */
+@Internal
+public class JsonArrayAggFunction
+        extends BuiltInAggregateFunction<String, JsonArrayAggFunction.Accumulator> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Marker that represents a {@code null} since {@link ListView} does not allow {@code null}s.
+     *
+     * <p>Note that due to {@link WrapJsonAggFunctionArgumentsRule} and the fact that this function
+     * already only receives JSON strings, this value cannot be created by the user and is thus safe
+     * to use.
+     */
+    private static final StringData NULL_STR = StringData.fromString("null");
+
+    private final transient List<DataType> argumentTypes;
+    private final SqlJsonConstructorNullClause onNull;
+
+    public JsonArrayAggFunction(LogicalType[] argumentTypes, SqlJsonConstructorNullClause onNull) {
+        this.argumentTypes =
+                Arrays.stream(argumentTypes)
+                        .map(DataTypeUtils::toInternalDataType)
+                        .collect(Collectors.toList());
+
+        this.onNull = onNull;
+    }
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return argumentTypes;
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public DataType getAccumulatorDataType() {
+        return DataTypes.STRUCTURED(
+                Accumulator.class,
+                DataTypes.FIELD("list", ListView.newListViewDataType(DataTypes.STRING())));
+    }
+
+    @Override
+    public Accumulator createAccumulator() {
+        return new Accumulator();
+    }
+
+    public void resetAccumulator(Accumulator acc) {
+        acc.list.clear();
+    }
+
+    public void accumulate(Accumulator acc, StringData itemData) throws Exception {
+        if (itemData == null) {
+            switch (onNull) {
+                case NULL_ON_NULL:
+                    acc.list.add(NULL_STR);
+                    break;
+                case ABSENT_ON_NULL:
+                    break;
+                default:
+                    throw new TableException(
+                            String.format("Unsupported ON NULL behavior: %s", onNull));
+            }
+        } else {
+            acc.list.add(itemData);
+        }
+    }
+
+    public void retract(Accumulator acc, StringData itemData) throws Exception {
+        if (itemData == null) {
+            acc.list.remove(NULL_STR);
+        } else {
+            acc.list.remove(itemData);
+        }
+    }
+
+    public void merge(Accumulator acc, Iterable<Accumulator> others) throws Exception {
+        for (final Accumulator other : others) {
+            acc.list.addAll(other.list.getList());
+        }
+    }
+
+    @Override
+    public String getValue(Accumulator acc) {
+        final ArrayNode rootNode = createArrayNode();
+        try {
+            for (final StringData item : acc.list.get()) {
+                final JsonNode itemNode =
+                        getNodeFactory().rawValueNode(new RawValue(item.toString()));

Review comment:
       I removed `merge` for now, documented the limitation and raised [FLINK-24664](https://issues.apache.org/jira/browse/FLINK-24664). Since we're facing the same issue for FIRST_VALUE / LAST_VALUE (and in theory also e.g. LISTAGG), I've also cross-linked them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17562:
URL: https://github.com/apache/flink/pull/17562#issuecomment-951200622


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25434",
       "triggerID" : "7a90ad8e577b79b3f68fbed12a5824f6b822c129",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25498",
       "triggerID" : "dc21e87ad1ed66a9ce41d2a93e52bd5a7108bc82",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "953148532",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3a3daa5d043d9d67fbf13a432ac12fcaf2593fde",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 3a3daa5d043d9d67fbf13a432ac12fcaf2593fde UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #17562: [FLINK-16206][table-planner] Support JSON_ARRAYAGG

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #17562:
URL: https://github.com/apache/flink/pull/17562


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org