You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/02 11:29:16 UTC

[GitHub] [flink] snuyanzin opened a new pull request, #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

snuyanzin opened a new pull request, #19623:
URL: https://github.com/apache/flink/pull/19623

   ## What is the purpose of the change
   This is an implementation of `ARRAY_DISTINCT` inspired by https://github.com/apache/flink/pull/19543
   
   ## Brief change log
   
     - ARRAY_DISTINCT for Table API and SQL
   
   
   ## Verifying this change
   
   This change added tests in `CollectionFunctionsITCase` 
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): ( no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes )
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): ( no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
     - The S3 file system connector: ( no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "twalthr (via GitHub)" <gi...@apache.org>.
twalthr commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1117290234


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+        final DataType dataType =
+                ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData haystack) {
+        try {
+            if (haystack == null) {
+                return null;
+            }
+            Set set = new LinkedHashSet<>();
+            final int size = haystack.size();
+            for (int pos = 0; pos < size; pos++) {
+                final Object element = elementGetter.getElementOrNull(haystack, pos);
+                set.add(element);

Review Comment:
   I don’t have the time to find a bug, but I just wanted to point out the existence of the expression evaluator to get access to primitive functions such as CAST or EQUALS. CC @luoyuxia 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on PR #19623:
URL: https://github.com/apache/flink/pull/19623#issuecomment-1177360455

   @dianfu since you merged recently several functions, could you please have a look here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1107075469


##########
docs/data/sql_functions.yml:
##########
@@ -614,6 +614,9 @@ collection:
   - sql: ARRAY_CONTAINS(haystack, needle)
     table: haystack.arrayContains(needle)
     description: Returns whether the given element exists in an array. Checking for null elements in the array is supported. If the array itself is null, the function will return null. The given element is cast implicitly to the array's element type if necessary.
+  - sql: ARRAY_DISTINCT(haystack)
+    table: haystack.arrayDistinct()

Review Comment:
   Thanks, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "twalthr (via GitHub)" <gi...@apache.org>.
twalthr commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1117290234


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+        final DataType dataType =
+                ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData haystack) {
+        try {
+            if (haystack == null) {
+                return null;
+            }
+            Set set = new LinkedHashSet<>();
+            final int size = haystack.size();
+            for (int pos = 0; pos < size; pos++) {
+                final Object element = elementGetter.getElementOrNull(haystack, pos);
+                set.add(element);

Review Comment:
   I don’t have the time to find a bug, but I just wanted to point out the existence of the expression evaluator to get access to primitive functions such as CAST or EQUALS. CC @liuyongvs 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "dianfu (via GitHub)" <gi...@apache.org>.
dianfu commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1107056665


##########
docs/data/sql_functions.yml:
##########
@@ -614,6 +614,9 @@ collection:
   - sql: ARRAY_CONTAINS(haystack, needle)
     table: haystack.arrayContains(needle)
     description: Returns whether the given element exists in an array. Checking for null elements in the array is supported. If the array itself is null, the function will return null. The given element is cast implicitly to the array's element type if necessary.
+  - sql: ARRAY_DISTINCT(haystack)
+    table: haystack.arrayDistinct()

Review Comment:
   Need also update https://raw.githubusercontent.com/apache/flink/master/flink-python/docs/reference/pyflink.table/expressions.rst



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19623:
URL: https://github.com/apache/flink/pull/19623#issuecomment-1114748850

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "4fa86442276e3bb944d33c3eea7e1440efb8a9bc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4fa86442276e3bb944d33c3eea7e1440efb8a9bc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4fa86442276e3bb944d33c3eea7e1440efb8a9bc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1118251199


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+        final DataType dataType =
+                ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData haystack) {
+        try {
+            if (haystack == null) {
+                return null;
+            }
+            Set set = new LinkedHashSet<>();
+            final int size = haystack.size();
+            for (int pos = 0; pos < size; pos++) {
+                final Object element = elementGetter.getElementOrNull(haystack, pos);
+                set.add(element);

Review Comment:
   hi @twalthr thanks for reminding us. but i have a question with array_contains. why element and needle type is not null. the design makes develop confused. and it cause a bug, you can see here https://github.com/apache/flink/pull/21993
   
           equalityEvaluator =
                   context.createEvaluator(
                           $("element").isEqual($("needle")),
                           DataTypes.BOOLEAN(),
                           DataTypes.FIELD("element", needleDataType.notNull().toInternal()),
                           DataTypes.FIELD("needle", needleDataType.notNull().toInternal()));



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1118435014


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+        final DataType dataType =
+                ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData haystack) {
+        try {
+            if (haystack == null) {
+                return null;
+            }
+            Set set = new LinkedHashSet<>();
+            final int size = haystack.size();
+            for (int pos = 0; pos < size; pos++) {
+                final Object element = elementGetter.getElementOrNull(haystack, pos);
+                set.add(element);

Review Comment:
   hi @twalthr @snuyanzin i submit a PR to fix this https://github.com/apache/flink/pull/22030



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "dianfu (via GitHub)" <gi...@apache.org>.
dianfu closed pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function
URL: https://github.com/apache/flink/pull/19623


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] twalthr commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "twalthr (via GitHub)" <gi...@apache.org>.
twalthr commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1117286705


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+        final DataType dataType =
+                ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData haystack) {
+        try {
+            if (haystack == null) {
+                return null;
+            }
+            Set set = new LinkedHashSet<>();
+            final int size = haystack.size();
+            for (int pos = 0; pos < size; pos++) {
+                final Object element = elementGetter.getElementOrNull(haystack, pos);
+                set.add(element);

Review Comment:
   @snuyanzin We need to be careful with equality of `RowData`, `MapData` and others. I'm not saying that this implementation is wrong, maybe it works for most cases. But I'm also not sure whether there is a hidden bug.
   
   If you look at `BinaryRowData`, it compares binary sections. Which means that equality is not based on SQL semantics. This is actually why I added `SpecializedFunction` with a `ExpressionEvaluatorFactory`. Only with this piece it is now possible to call the SQL compliant equals function which can be used to assemble a proper distinct for all data types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #19623: [FLINK-27471][table] Add ARRAY_DISTINCT function

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #19623:
URL: https://github.com/apache/flink/pull/19623#discussion_r1118251199


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayDistinctFunction.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_DISTINCT}. */
+@Internal
+public class ArrayDistinctFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayDistinctFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_DISTINCT, context);
+        final DataType dataType =
+                ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(ArrayData haystack) {
+        try {
+            if (haystack == null) {
+                return null;
+            }
+            Set set = new LinkedHashSet<>();
+            final int size = haystack.size();
+            for (int pos = 0; pos < size; pos++) {
+                final Object element = elementGetter.getElementOrNull(haystack, pos);
+                set.add(element);

Review Comment:
   hi @twalthr thanks for reminding us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org