You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/11 12:20:44 UTC

[GitHub] [flink] godfreyhe opened a new pull request, #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

godfreyhe opened a new pull request, #20243:
URL: https://github.com/apache/flink/pull/20243

   
   ## What is the purpose of the change
   
   *Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql, see https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386481 for more details*
   
   
   ## Brief change log
   
     - *Introduce APPROX_COUNT_DISTINCT aggregate function in FlinkSqlOperatorTable*
     - *Introduce HyperLogLogPlusPlus *
     - *Implement ApproxCountDistinctAggFunction for different types*
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
     - *Added HyperLogLogPlusPlusTest to verify the logic of HyperLogLogPlusPlus*
     - *Added ApproxCountDistinctAggFunctionTest to verify the logic of  ApproxCountDistinctAggFunctions*
     - *Extended SortAggregateTest  to verify the plan for APPROX_COUNT_DISTINCT queries*
     - *Extended SortAggITCase  to verify the result for APPROX_COUNT_DISTINCT queries*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20243:
URL: https://github.com/apache/flink/pull/20243#issuecomment-1189039700

   Thanks for the review, I will rebase the master and squash the commit. I will merge the pr once it turns green


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] swuferhong commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
swuferhong commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r922784619


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XXH64.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XXH64.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XXH64.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XXH64.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {
+
+    /** Base function for APPROX_COUNT_DISTINCT aggregate. */
+    public abstract static class ApproxCountDistinctAggFunction<T>
+            extends BuiltInAggregateFunction<Long, HllBuffer> {
+
+        private static final Double RELATIVE_SD = 0.01;

Review Comment:
   I think here need to add some comments to explain why choose 0.01 ?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala:
##########
@@ -310,6 +312,57 @@ class SortAggITCase extends AggregateITCaseBase("SortAggregate") {
       Seq(row("null"))
     )
   }
+
+  @Test
+  def testApproximateCountDistinct(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(TestData.fullDataTypesData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyTable (
+         |  `boolean` BOOLEAN,
+         |  `byte` TINYINT,
+         |  `short` SMALLINT,
+         |  `int` INT,
+         |  `long` BIGINT,
+         |  `float` FLOAT,
+         |  `double` DOUBLE,
+         |  `decimal52` DECIMAL(5, 2),

Review Comment:
   Adding more decimal types, like decimal (14, 2) and decimal (38, 2).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r922623326


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate.hyperloglog;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The test of HyperLogLogPlusPlus is inspired from Apache Spark. */
+public class HyperLogLogPlusPlusTest {
+
+    @Test
+    public void testInvalidRelativeSD() {
+        assertThatThrownBy(() -> new HyperLogLogPlusPlus(0.4))

Review Comment:
   Both methods are widely used in flink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on PR #20243:
URL: https://github.com/apache/flink/pull/20243#issuecomment-1189706693

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r923011831


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/DecimalData.java:
##########
@@ -48,7 +48,7 @@ public final class DecimalData implements Comparable<DecimalData> {
     static final int MAX_INT_DIGITS = 9;
 
     /** Maximum number of decimal digits a Long can represent. (1e18 < Long.MaxValue < 1e19) */
-    static final int MAX_LONG_DIGITS = 18;
+    public static final int MAX_LONG_DIGITS = 18;

Review Comment:
   why not use `DecimalDataUtils.isByteArrayDecimal` instead of exposing it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r922623503


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {

Review Comment:
   I had add some test cases in SortAggITCase



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r923256167


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/SortAggITCase.scala:
##########
@@ -310,6 +312,57 @@ class SortAggITCase extends AggregateITCaseBase("SortAggregate") {
       Seq(row("null"))
     )
   }
+
+  @Test
+  def testApproximateCountDistinct(): Unit = {
+    val dataId = TestValuesTableFactory.registerData(TestData.fullDataTypesData)
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyTable (
+         |  `boolean` BOOLEAN,
+         |  `byte` TINYINT,
+         |  `short` SMALLINT,
+         |  `int` INT,
+         |  `long` BIGINT,
+         |  `float` FLOAT,
+         |  `double` DOUBLE,
+         |  `decimal52` DECIMAL(5, 2),

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r921862422


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/XxHash64Function.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate.hyperloglog;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/** Utility class to get hash code based on {@link XXH64}. */
+public class XxHash64Function {
+
+    public static final long DEFAULT_SEED = 42L;
+
+    public static long hashInt(int i, long seed) {
+        return XXH64.hashInt(i, seed);
+    }
+
+    public static long hashLong(long l, long seed) {
+        return XXH64.hashLong(l, seed);
+    }
+
+    public static long hashUnsafeBytes(MemorySegment base, int offset, int length, long seed) {
+        return XXH64.hashUnsafeBytes(base, offset, length, seed);
+    }
+
+    /**
+     * Computes hash of a given `value`. The caller needs to check the validity of input `value`.
+     */
+    public static long hash(Object value, long seed) {
+        if (value == null) {
+            return seed;
+        } else if (value instanceof Boolean) {
+            boolean b = (boolean) value;
+            return hashInt(b ? 1 : 0, seed);
+        } else if (value instanceof Byte) {
+            return hashInt((Byte) value, seed);
+        } else if (value instanceof Short) {
+            return hashInt((Short) value, seed);
+        } else if (value instanceof Integer) {
+            return hashInt((Integer) value, seed);
+        } else if (value instanceof Long) {
+            return hashLong((Long) value, seed);
+        } else if (value instanceof Float) {
+            return hashInt(Float.floatToIntBits((Float) value), seed);
+        } else if (value instanceof Double) {
+            return hashLong(Double.doubleToLongBits((Double) value), seed);
+        } else if (value instanceof Date) {
+            return hashLong(((Date) value).getTime(), seed);
+        } else if (value instanceof Time) {
+            return hashLong(((Time) value).getTime(), seed);
+        } else if (value instanceof Timestamp) {
+            return hashLong(((Timestamp) value).getTime(), seed);
+        } else if (value instanceof BigDecimal) {

Review Comment:
   I'm not understand why here ignore `DecimalData` branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r922623564


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {
+
+    /** Base function for APPROX_COUNT_DISTINCT aggregate. */
+    public abstract static class ApproxCountDistinctAggFunction<T>
+            extends BuiltInAggregateFunction<Long, HllBuffer> {
+
+        private static final Double RELATIVE_SD = 0.01;
+        private transient HyperLogLogPlusPlus hll;
+
+        private final transient DataType valueDataType;
+
+        public ApproxCountDistinctAggFunction(LogicalType valueType) {
+            this.valueDataType = toInternalDataType(valueType);
+        }
+
+        @Override
+        public HllBuffer createAccumulator() {
+            hll = new HyperLogLogPlusPlus(RELATIVE_SD);
+            HllBuffer buffer = new HllBuffer();
+            buffer.array = new long[hll.getNumWords()];
+            resetAccumulator(buffer);
+            return buffer;
+        }
+
+        public void accumulate(HllBuffer buffer, Object input) throws Exception {
+            if (input != null) {
+                hll.updateByHashcode(buffer, getHashcode((T) input));
+            }
+        }
+
+        abstract long getHashcode(T t);

Review Comment:
   for performance, this approach can avoid many switch case for each record



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r922623138


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe closed pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql
URL: https://github.com/apache/flink/pull/20243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r921864800


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate.hyperloglog;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The test of HyperLogLogPlusPlus is inspired from Apache Spark. */
+public class HyperLogLogPlusPlusTest {
+
+    @Test
+    public void testInvalidRelativeSD() {
+        assertThatThrownBy(() -> new HyperLogLogPlusPlus(0.4))

Review Comment:
   What about using `CommonTestUtils.assertThrows()`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {

Review Comment:
   This class must be placed in runtime mode, the planner package is not available in runtime after scala free.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {

Review Comment:
   It would be better add an e2e test.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/XxHash64Function.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate.hyperloglog;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/** Utility class to get hash code based on {@link XXH64}. */
+public class XxHash64Function {

Review Comment:
   As offline discuss, this class is not needed.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/functions/aggregate/hyperloglog/HyperLogLogPlusPlusTest.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.aggregate.hyperloglog;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The test of HyperLogLogPlusPlus is inspired from Apache Spark. */
+public class HyperLogLogPlusPlusTest {
+
+    @Test

Review Comment:
   It would be better use junit5.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/BatchApproxCountDistinctAggFunctions.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.aggfunctions;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.runtime.functions.aggregate.BuiltInAggregateFunction;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HllBuffer;
+import org.apache.flink.table.runtime.functions.aggregate.hyperloglog.HyperLogLogPlusPlus;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.DEFAULT_SEED;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashInt;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashLong;
+import static org.apache.flink.table.runtime.functions.aggregate.hyperloglog.XxHash64Function.hashUnsafeBytes;
+import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType;
+
+/** Built-in APPROX_COUNT_DISTINCT aggregate function for Batch sql. */
+public class BatchApproxCountDistinctAggFunctions {
+
+    /** Base function for APPROX_COUNT_DISTINCT aggregate. */
+    public abstract static class ApproxCountDistinctAggFunction<T>
+            extends BuiltInAggregateFunction<Long, HllBuffer> {
+
+        private static final Double RELATIVE_SD = 0.01;
+        private transient HyperLogLogPlusPlus hll;
+
+        private final transient DataType valueDataType;
+
+        public ApproxCountDistinctAggFunction(LogicalType valueType) {
+            this.valueDataType = toInternalDataType(valueType);
+        }
+
+        @Override
+        public HllBuffer createAccumulator() {
+            hll = new HyperLogLogPlusPlus(RELATIVE_SD);
+            HllBuffer buffer = new HllBuffer();
+            buffer.array = new long[hll.getNumWords()];
+            resetAccumulator(buffer);
+            return buffer;
+        }
+
+        public void accumulate(HllBuffer buffer, Object input) throws Exception {
+            if (input != null) {
+                hll.updateByHashcode(buffer, getHashcode((T) input));
+            }
+        }
+
+        abstract long getHashcode(T t);

Review Comment:
   Why here provide an abstract method and implement different subclass?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20243:
URL: https://github.com/apache/flink/pull/20243#issuecomment-1180348234

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "22a678de4178f416ad55f14b4a6289aa775ece14",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "22a678de4178f416ad55f14b4a6289aa775ece14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 22a678de4178f416ad55f14b4a6289aa775ece14 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lincoln-lil commented on a diff in pull request #20243: [FLINK-28491][table-planner] Introduce APPROX_COUNT_DISTINCT aggregate function for batch sql

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on code in PR #20243:
URL: https://github.com/apache/flink/pull/20243#discussion_r923368739


##########
tools/maven/suppressions.xml:
##########
@@ -45,6 +45,9 @@ under the License.
 		<!-- Temporarily fix TM Metaspace memory leak caused by Apache Beam sdk harness. -->
 		<suppress files="org[\\/]apache[\\/]beam.*.java" checks="[a-zA-Z0-9]*"/>
 
+		<!-- Have to use guava directly -->

Review Comment:
   nit: the comment seems incorrect, and move it to line 39 maybe better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org