You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/22 03:05:20 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request, #395: [FLINK-30125] Projection pushdown is not work for partial update

JingsongLi opened a new pull request, #395:
URL: https://github.com/apache/flink-table-store/pull/395

   We did not properly process the project in MergeFunction, which resulted in subsequent reading position errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029996540


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java:
##########
@@ -169,7 +172,7 @@ public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
 
             List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
             MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
-                    new ReducerMergeFunctionWrapper(mergeFunction.copy());
+                    new ReducerMergeFunctionWrapper(mfFactory.create(valueProjection));

Review Comment:
   Yes, I should delete `.copy()`, we have factory now, we don't need to have a copy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030087733


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java:
##########
@@ -65,8 +65,10 @@ public void testMergeRead() {
         batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))");
         batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')");
 
-        List<Row> result = batchSql("SELECT * FROM T");
-        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
+        assertThat(batchSql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
+
+        // projection
+        assertThat(batchSql("SELECT a FROM T")).containsExactlyInAnyOrder(Row.of(4));

Review Comment:
   At present, we do not support nested pushdowns, so there is no difference between array maps and ordinary fields.



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java:
##########
@@ -711,6 +710,10 @@ public void testMergeInMemory() {
                                     (long) 10101000,
                                     (float) 0,
                                     1.11));
+
+            // projection
+            assertThat(batchSql("SELECT f,e FROM T1"))

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030001343


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java:
##########
@@ -246,4 +245,65 @@ public List<DataField> keyFields(TableSchema schema) {
             return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
         }
     }
+
+    private static class PartialUpdateMergeFunctionFactory

Review Comment:
   I will create a factory for each merge function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #395:
URL: https://github.com/apache/flink-table-store/pull/395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029984725


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java:
##########
@@ -246,4 +245,65 @@ public List<DataField> keyFields(TableSchema schema) {
             return addKeyNamePrefix(schema.trimmedPrimaryKeysFields());
         }
     }
+
+    private static class PartialUpdateMergeFunctionFactory

Review Comment:
   I prefer moving these factories to the inner class of the corresponding merge functions, or make them an independent class.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java:
##########
@@ -169,7 +172,7 @@ public RecordReader<KeyValue> createReader(DataSplit split) throws IOException {
 
             List<ConcatRecordReader.ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
             MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
-                    new ReducerMergeFunctionWrapper(mergeFunction.copy());
+                    new ReducerMergeFunctionWrapper(mfFactory.create(valueProjection));

Review Comment:
   `.copy()` is lost with current implementation.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** Factory to create {@link MergeFunction}. */
+@FunctionalInterface
+public interface MergeFunctionFactory<T> extends Serializable {
+
+    MergeFunction<T> create(@Nullable int[][] projection);
+
+    static <T> MergeFunctionFactory<T> of(MergeFunction<T> mergeFunction) {
+        return new InstanceFactory<>(mergeFunction);
+    }
+
+    /** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */
+    class InstanceFactory<T> implements MergeFunctionFactory<T> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final MergeFunction<T> mergeFunction;
+
+        public InstanceFactory(MergeFunction<T> mergeFunction) {
+            this.mergeFunction = mergeFunction;
+        }
+
+        @Override
+        public MergeFunction<T> create(@Nullable int[][] projection) {
+            return mergeFunction;

Review Comment:
   `mergeFunction.copy()`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** Factory to create {@link MergeFunction}. */
+@FunctionalInterface
+public interface MergeFunctionFactory<T> extends Serializable {
+
+    MergeFunction<T> create(@Nullable int[][] projection);
+
+    static <T> MergeFunctionFactory<T> of(MergeFunction<T> mergeFunction) {
+        return new InstanceFactory<>(mergeFunction);
+    }
+
+    /** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */
+    class InstanceFactory<T> implements MergeFunctionFactory<T> {

Review Comment:
   Create a factory for each merge function? So that users are not allowed to create a merge function instance directly. They must create merge functions from the factories.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029996389


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** Factory to create {@link MergeFunction}. */
+@FunctionalInterface
+public interface MergeFunctionFactory<T> extends Serializable {
+
+    MergeFunction<T> create(@Nullable int[][] projection);
+
+    static <T> MergeFunctionFactory<T> of(MergeFunction<T> mergeFunction) {
+        return new InstanceFactory<>(mergeFunction);
+    }
+
+    /** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */
+    class InstanceFactory<T> implements MergeFunctionFactory<T> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final MergeFunction<T> mergeFunction;
+
+        public InstanceFactory(MergeFunction<T> mergeFunction) {
+            this.mergeFunction = mergeFunction;
+        }
+
+        @Override
+        public MergeFunction<T> create(@Nullable int[][] projection) {
+            return mergeFunction;

Review Comment:
   I should change this to `new instance`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030044691


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java:
##########
@@ -48,8 +48,17 @@ public KeyValue getResult() {
         return latestKv;
     }
 
-    @Override
-    public MergeFunction<KeyValue> copy() {
-        return new DeduplicateMergeFunction();
+    public static MergeFunctionFactory<KeyValue> factory() {
+        return new Factory();
+    }
+
+    private static class Factory implements MergeFunctionFactory<KeyValue> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+            return new DeduplicateMergeFunction();

Review Comment:
   Does this need to comment for unsupport of the nested projection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029996831


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** Factory to create {@link MergeFunction}. */
+@FunctionalInterface
+public interface MergeFunctionFactory<T> extends Serializable {
+
+    MergeFunction<T> create(@Nullable int[][] projection);
+
+    static <T> MergeFunctionFactory<T> of(MergeFunction<T> mergeFunction) {
+        return new InstanceFactory<>(mergeFunction);
+    }
+
+    /** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */
+    class InstanceFactory<T> implements MergeFunctionFactory<T> {

Review Comment:
   Nice point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030088540


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java:
##########
@@ -48,8 +48,17 @@ public KeyValue getResult() {
         return latestKv;
     }
 
-    @Override
-    public MergeFunction<KeyValue> copy() {
-        return new DeduplicateMergeFunction();
+    public static MergeFunctionFactory<KeyValue> factory() {
+        return new Factory();
+    }
+
+    private static class Factory implements MergeFunctionFactory<KeyValue> {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+            return new DeduplicateMergeFunction();

Review Comment:
   Actually, this class does not require projection relevant information.
   In `PartialUpdateMergeFunction`, validation is not required too, because the `Project` class will throw an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030008452


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java:
##########
@@ -78,7 +78,7 @@ private CompactResult rewriteFullCompaction(List<List<SortedRun>> sections) thro
                                 runReaders,
                                 keyComparator,
                                 new FullChangelogMergeFunctionWrapper(
-                                        mergeFunction.copy(), maxLevel));
+                                        mfFactory.create(null), maxLevel));

Review Comment:
   Add a default implementation in the interface.
   
   ```java
   default MergeFunction<T> create() {
       return create(null);
   }
   ```



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java:
##########
@@ -78,7 +78,7 @@ private CompactResult rewriteFullCompaction(List<List<SortedRun>> sections) thro
                                 runReaders,
                                 keyComparator,
                                 new FullChangelogMergeFunctionWrapper(
-                                        mergeFunction.copy(), maxLevel));
+                                        mfFactory.create(null), maxLevel));

Review Comment:
   Add a default implementation in the interface.
   
   ```java
   default MergeFunction<T> create() {
       return create(null);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030044159


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PreAggregationITCase.java:
##########
@@ -711,6 +710,10 @@ public void testMergeInMemory() {
                                     (long) 10101000,
                                     (float) 0,
                                     1.11));
+
+            // projection
+            assertThat(batchSql("SELECT f,e FROM T1"))

Review Comment:
   Does this cover the array and list projection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] SteNicholas commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1030043673


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java:
##########
@@ -65,8 +65,10 @@ public void testMergeRead() {
         batchSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))");
         batchSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')");
 
-        List<Row> result = batchSql("SELECT * FROM T");
-        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
+        assertThat(batchSql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
+
+        // projection
+        assertThat(batchSql("SELECT a FROM T")).containsExactlyInAnyOrder(Row.of(4));

Review Comment:
   Does this cover the array and list projection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #395: [FLINK-30125] Projection pushdown is not work for partial update

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #395:
URL: https://github.com/apache/flink-table-store/pull/395#discussion_r1029984979


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/** Factory to create {@link MergeFunction}. */
+@FunctionalInterface
+public interface MergeFunctionFactory<T> extends Serializable {
+
+    MergeFunction<T> create(@Nullable int[][] projection);
+
+    static <T> MergeFunctionFactory<T> of(MergeFunction<T> mergeFunction) {
+        return new InstanceFactory<>(mergeFunction);
+    }
+
+    /** A {@link MergeFunctionFactory} from a {@link MergeFunction} instance. */
+    class InstanceFactory<T> implements MergeFunctionFactory<T> {

Review Comment:
   Create a factory for each merge function? So that users are not allowed to create a merge function instance directly (constructors should be private). They must create merge functions from the factories.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org