You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/12 13:30:17 UTC

[GitHub] [flink-table-store] ajian2002 opened a new pull request, #121: Introduce AggregatuibMergeFunction

ajian2002 opened a new pull request, #121:
URL: https://github.com/apache/flink-table-store/pull/121

   We can introduce more powerful merge strategies, such as merges that support pre-aggregation.
   demand comes from
   https://summer-ospp.ac.cn/#/org/prodetail/225120149
   This is an early version that only supports aggregate function usage of all columns
   Follow up to improve the function as needed, and use different MergeFunctions flexibly on different columns


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871987331


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }
+        else if (oldValue instanceof Double && currentField instanceof Double)
+        {
+            return Double.sum((Double) oldValue, (Double) currentField);
+        }
+        else if (oldValue instanceof Float && currentField instanceof Float)
+        {
+            return Float.sum((Float) oldValue, (Float) currentField);
+        }
+        else if (oldValue instanceof String && currentField instanceof String)
+        {
+            return "null";
+        }
+        return null;

Review Comment:
   Throws exception instead of returning null for types which do not support summing. It is better to explicitly tell the user what we do not support instead of quietly do something probably unexpected to user. Also it would be better to perform this check in the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r873287282


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }

Review Comment:
   I searched for a while and sadly I didn't find any class that can be reused. Many of the aggregate functions in the planner module are bound to code generation which can't be reused easily. I guess you need to implement your own aggregate classes.
   
   Implementing your own classes has another benefit to decrease our dependency on Flink. We'd also like to support other systems in the future (we're currently implementing Hive support for example) and if we rely too much on Flink it would be a headache to identify what should be exported from Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r874814396


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ITCase for partial update.
+ */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation'     );");
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2, 4));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4, 6));
+    }
+
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, InterruptedException {
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+        // key pk1
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await();
+
+        // key pk2
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await();
+
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1",11,12), Row.of("pk2",19,11));
+    }
+
+//    @Test

Review Comment:
   > Usually, it's not appropriate to leave commented code. If it's useless, remove it instead.
   This test example comes from `flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java`
   I don't know if this test makes sense for `flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java`,
   do I need to keep this test .
   



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ITCase for partial update.
+ */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation'     );");
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2, 4));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4, 6));
+    }
+
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, InterruptedException {
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+        // key pk1
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await();
+
+        // key pk2
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await();
+
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1",11,12), Row.of("pk2",19,11));
+    }
+
+//    @Test

Review Comment:
   This test example comes from `flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java`
   I don't know if this test makes sense for `flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java`,
   do I need to keep this test .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r885750661


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateMergeFunction.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record,
+ * aggregate specifies field on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<ColumnAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+    private final Map<String, AggregationKind> aggregationKindMap;
+
+    public AggregateMergeFunction(
+            RowType primaryKeyType,
+            RowType rowType,
+            Map<String, AggregationKind> aggregationKindMap) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregationKindMap = aggregationKindMap;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            ColumnAggregateFunction<?> f = null;
+            if (aggregationKindMap.containsKey(rowNames.get(i))) {
+                f =
+                        ColumnAggregateFunctionFactory.getColumnAggregateFunction(
+                                aggregationKindMap.get(rowNames.get(i)), rowType.getTypeAt(i));
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            ColumnAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (value.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());
+                    }
+                    Object result = f.getResult();
+                    if (result != null) {
+                        row.setField(i, result);

Review Comment:
   > `AggregateMergeFunction#reset` will be called before adding the first row of a new primary key, and `AggregateMergeFunction#getValue` will be called after the last row of that primary key. See `SortMergeReader` if you're intersted.
   > 
   > > I don't know when AggregateMergeFunction#reset and AggregateMergeFunction#getValue are called, and the breakpoint is not called when passing the test.
   > 
   > Are you sure about this? My breakpoint at `AggregateMergeFunction#reset` and `AggregateMergeFunction#getValue` can be triggered when running `AggregationITCase`.
   
   I'm sorry , i got it wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871982968


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ITCase for partial update.
+ */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation'     );");
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2, 4));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4, 6));
+    }
+
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, InterruptedException {
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+        // key pk1
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await();
+
+        // key pk2
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await();
+
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1",11,12), Row.of("pk2",19,11));
+    }
+
+//    @Test

Review Comment:
   Usually, it's not appropriate to leave commented code. If it's useless, remove it instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 closed pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 closed pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction
URL: https://github.com/apache/flink-table-store/pull/121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r877715408


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java:
##########
@@ -201,17 +204,28 @@ public static FileStoreImpl createWithPrimaryKey(
                                 .collect(Collectors.toList()));
 
         MergeFunction mergeFunction;
+        Map<String, String> rightConfMap =
+                options.getFilterConf(e -> e.getKey().endsWith(".aggregate-function"));

Review Comment:
   Not sure what you mean. Could you provide an example? What is an `extended aggregate function`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1148558266

   I solved all the problems, you can continue to review my code @tsreaper 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871965386


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }

Review Comment:
   These `instanceof` checks will be performed on a per-record bases and have a big impact on performance. That's why we use `RowData.FieldGetter` instead of `instanceof` to fetch columns out of a row.
   
   A better approach is to create an `Aggregator` class just like `RowData.FieldGetter` and in the constructor of `AggregationMergeFunction` we choose `Aggregator` for each column according to their types. So on a per-record path we can use the `Aggregator` directly without these checks. I recommend taking this route.
   
   An even better approach is to use Flink's code generation systems but that might be too complex for your first contribution, considering its usage and the fact that internal system of Flink table planner is now awkward to call from the outside due to the scala-free change in Flink 1.15. If you're interested see `AggWithoutKeysCodeGenerator` in Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r872505878


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ITCase for partial update.
+ */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation'     );");
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2, 4));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4, 6));
+    }
+
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, InterruptedException {
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+        // key pk1
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await();
+
+        // key pk2
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await();
+
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1",11,12), Row.of("pk2",19,11));
+    }
+
+//    @Test

Review Comment:
   This test example comes from `flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java`
   I don't know if this test makes sense for `flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java`,
   do I need to keep this test .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r872947138


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }
+        else if (oldValue instanceof Double && currentField instanceof Double)
+        {
+            return Double.sum((Double) oldValue, (Double) currentField);
+        }
+        else if (oldValue instanceof Float && currentField instanceof Float)
+        {
+            return Float.sum((Float) oldValue, (Float) currentField);
+        }
+        else if (oldValue instanceof String && currentField instanceof String)
+        {
+            return "null";
+        }
+        return null;

Review Comment:
   Is there a more explicit exception than RuntimeException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1126169687

   If I want to test the same MergeFunction on tables with different structures, what should I do? (Is it possible to create a new Test file again?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r873281945


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(
+            RowData.FieldGetter[] fieldGetters, RowType primaryKeyType, RowType rowType) {
+        this.getters = fieldGetters;
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.primaryKeyNames = new HashSet<>(primaryKeyType.getFieldNames());
+        this.rowNames = new ArrayList<>(rowType.getFieldNames());
+        this.types = new ArrayList<>(rowType.getFieldCount());
+        AggregationFunctionFactory factory = Aggregations.SUM.getFactory();

Review Comment:
   Nice find. I don't even know we have such a class in Flink.
   
   However this class is marked as `@Internal` in Flink which means it is not a public API. Also this class does not support retraction and other aggregate functions such as `avg`. I'm not sure if there are better ways to do this or if it'll be better to create our own aggregate function classes. I'll leave this to the other reviewers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1132579400

   You can try this test case if your're interested.
   ```java
   @Test
   public void myTest() throws Exception {
       String ddl3 =
               "CREATE TABLE IF NOT EXISTS T5 ( dt STRING, hr INT, price INT, PRIMARY KEY (dt, hr) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'price.aggregate-function' = 'sum' );";
       String tmpPath;
       try {
           tmpPath = TEMPORARY_FOLDER.newFolder().toURI().toString();
       } catch (Exception e) {
           throw new RuntimeException(e);
       }
       String ddl4 =
               "CREATE TABLE IF NOT EXISTS A ( dt STRING, hr INT, price INT ) WITH ( 'connector' = 'filesystem', 'path' = '"
                       + tmpPath
                       + "', 'format' = 'avro' );";
       String ddl5 =
               "CREATE TABLE IF NOT EXISTS P ( dt STRING, hr INT, price INT ) WITH ( 'connector' = 'print' );";
       bEnv.executeSql(ddl3).await();
       bEnv.executeSql(ddl4).await();
       sEnv.executeSql(ddl3).await();
       sEnv.executeSql(ddl4).await();
       sEnv.executeSql(ddl5).await();
       bEnv.executeSql(
                       "INSERT INTO A VALUES ('20220101', 8, 100), ('20220101', 8, 300), ('20220101', 8, 200), ('20220101', 8, 400), ('20220101', 9, 100)")
               .await();
       sEnv.executeSql(
                       "INSERT INTO T5 SELECT dt, hr, price FROM ("
                               + "  SELECT dt, hr, price, ROW_NUMBER() OVER (PARTITION BY dt, hr ORDER BY price desc) AS rn FROM A"
                               + ") WHERE rn <= 2")
               .await();
       sEnv.executeSql(
                       "INSERT INTO P SELECT dt, hr, price FROM ("
                               + "  SELECT dt, hr, price, ROW_NUMBER() OVER (PARTITION BY dt, hr ORDER BY price desc) AS rn FROM A"
                               + ") WHERE rn <= 2")
               .await();
       List<Row> result = iteratorToList(bEnv.from("T5").execute().collect());
       System.out.println(result);
   }
   ```
   
   This SQL script calculates the sum of the top 2 largest price in each hour. I've also created a print sink so that you can see what's going into the sink.
   ```
   1> +I[20220101, 8, 100]
   1> +I[20220101, 8, 300] # insert price 100 and 300
   1> -D[20220101, 8, 100]
   1> +I[20220101, 8, 200] # price 200 comes, so 100 should be removed out of the result
   1> -D[20220101, 8, 200]
   1> +I[20220101, 8, 400] # price 400 comes, so 200 should be removed out of the result
   1> +I[20220101, 9, 100] # 100 in another hour, not affected
   ```
   
   The expected result of our aggregate function should be `[+I[20220101, 8, 600], +I[20220101, 9, 100]]` but sadly current implementation prints out `[+I[20220101, 8, 1300], +I[20220101, 9, 100]]`.
   
   Sorry that my previous comments on row kinds are sort of misleading. Flink does have 4 row kinds but in Table Store we only consider key-value pairs instead of rows. Key-value pairs only have two value kinds (provided by `KeyValue#valueKind`): `ValueKind.ADD` means updating the corresponding key with the value (you can think of it as `Map#compute` in Java) and `ValueKind.DELETE` means removing the corresponding key (like `Map#remove` in Java). Although each key and each value is represented by a `RowData`, their row kind are meaningless and are always `INSERT`. Only their value kinds are important.
   
   To connect table store with Flink, we parse `RowData` from Flink into `KeyValue` according to both its row kind and the merge function used in `flink-table-store-connector` module. As the number of merge functions are growing we should consider extracting a common method to complete this parsing. But that is out of the scope of this PR.
   
   For this PR I suggest that we only support `INSERT` row kind and leave the support for other row kind later. All in all each PR should be as small as possible and only concentrate on one thing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r876537585


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java:
##########
@@ -201,17 +204,28 @@ public static FileStoreImpl createWithPrimaryKey(
                                 .collect(Collectors.toList()));
 
         MergeFunction mergeFunction;
+        Map<String, String> rightConfMap =
+                options.getFilterConf(e -> e.getKey().endsWith(".aggregate-function"));

Review Comment:
   Move this to `AGGREGATION` branch.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.

Review Comment:
   Flink Table Store is currently a sub-project of Flink. English comments only.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>
+ */
+public interface AggregateFunction<T> extends Serializable {
+    //     T aggregator;
+
+    T getResult();
+
+    default void init() {
+        reset();
+    }
+
+    void reset();
+
+    default void aggregate(Object value) {
+        aggregate(value, true);
+    }
+
+    void aggregate(Object value, boolean add);
+
+    void reset(Object value);

Review Comment:
   What's the usage of this method?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>
+ */
+public interface AggregateFunction<T> extends Serializable {
+    //     T aggregator;

Review Comment:
   Remove useless code.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<AggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+
+    private final Set<String> aggregateColumnNames;
+
+    public AggregationMergeFunction(
+            RowType primaryKeyType, RowType rowType, Set<String> aggregateColumnNames) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregateColumnNames = aggregateColumnNames;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        Arrays.fill(isPrimaryKey, false);
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            AggregateFunction<?> f = null;
+            if (aggregateColumnNames.contains(rowNames.get(i))) {
+                f = choiceRightAggregateFunction(rowType.getTypeAt(i).getDefaultConversion());
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    private AggregateFunction<?> choiceRightAggregateFunction(Class<?> c) {
+        AggregateFunction<?> f = null;
+        if (Double.class.equals(c)) {
+            f = new DoubleAggregateFunction();
+        } else if (Long.class.equals(c)) {
+            f = new LongAggregateFunction();
+        } else if (Integer.class.equals(c)) {
+            f = new IntegerAggregateFunction();
+        } else if (Float.class.equals(c)) {
+            f = new FloatAggregateFunction();
+        }
+        return f;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            AggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (row.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                            f.aggregate(currentField, false);
+                            break;
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());

Review Comment:
   You can treat `UPDATE_BEFORE` as `DELETE` and `UPDATE_AFTER` as `INSERT`. They're actually different in some other ways but in our scenario they're the same.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java:
##########
@@ -201,17 +204,28 @@ public static FileStoreImpl createWithPrimaryKey(
                                 .collect(Collectors.toList()));
 
         MergeFunction mergeFunction;
+        Map<String, String> rightConfMap =
+                options.getFilterConf(e -> e.getKey().endsWith(".aggregate-function"));
         switch (mergeEngine) {
             case DEDUPLICATE:
                 mergeFunction = new DeduplicateMergeFunction();
                 break;
             case PARTIAL_UPDATE:
-                List<LogicalType> fieldTypes = rowType.getChildren();
-                RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
-                for (int i = 0; i < fieldTypes.size(); i++) {
-                    fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+                mergeFunction = new PartialUpdateMergeFunction(rowType);
+                break;
+            case AGGREGATION:
+                Set<String> valueSet = new HashSet<>(rightConfMap.values());
+                if (valueSet.size() != 1 || !valueSet.contains("sum")) {
+                    throw new IllegalArgumentException(
+                            "Aggregate function must be the same for all columns");

Review Comment:
   Use a factory class to produce corresponding aggregate function. If we'd like to support other aggregate functions in the future this implementation is not friendly for changes.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>

Review Comment:
   Explain what `T` is. You can also remove this line if there is nothing to explain.



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {

Review Comment:
   There are no tests for retraction. Let me find you an example.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>
+ */
+public interface AggregateFunction<T> extends Serializable {
+    //     T aggregator;
+
+    T getResult();
+
+    default void init() {
+        reset();
+    }

Review Comment:
   What's the usage of this method?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>
+ */
+public interface AggregateFunction<T> extends Serializable {
+    //     T aggregator;
+
+    T getResult();
+
+    default void init() {
+        reset();
+    }
+
+    void reset();
+
+    default void aggregate(Object value) {
+        aggregate(value, true);
+    }
+
+    void aggregate(Object value, boolean add);

Review Comment:
   Split this into two methods, `aggregate(Object value)` and `retract(Object value)`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<AggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+
+    private final Set<String> aggregateColumnNames;
+
+    public AggregationMergeFunction(
+            RowType primaryKeyType, RowType rowType, Set<String> aggregateColumnNames) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregateColumnNames = aggregateColumnNames;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        Arrays.fill(isPrimaryKey, false);
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            AggregateFunction<?> f = null;
+            if (aggregateColumnNames.contains(rowNames.get(i))) {
+                f = choiceRightAggregateFunction(rowType.getTypeAt(i).getDefaultConversion());
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    private AggregateFunction<?> choiceRightAggregateFunction(Class<?> c) {
+        AggregateFunction<?> f = null;
+        if (Double.class.equals(c)) {
+            f = new DoubleAggregateFunction();
+        } else if (Long.class.equals(c)) {
+            f = new LongAggregateFunction();
+        } else if (Integer.class.equals(c)) {
+            f = new IntegerAggregateFunction();
+        } else if (Float.class.equals(c)) {
+            f = new FloatAggregateFunction();
+        }
+        return f;
+    }

Review Comment:
   A better implementation is to have a factory or util class which reads in a aggregate function name and its type and gives out the corresponding function.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<AggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+
+    private final Set<String> aggregateColumnNames;
+
+    public AggregationMergeFunction(
+            RowType primaryKeyType, RowType rowType, Set<String> aggregateColumnNames) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregateColumnNames = aggregateColumnNames;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        Arrays.fill(isPrimaryKey, false);

Review Comment:
   No need. `false` is the default value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1134781989

   I just rebase master, sorry,please ignore this push


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r883447651


##########
docs/content/docs/development/create-table.md:
##########
@@ -268,3 +268,39 @@ For example, the inputs:
 
 Output: 
 - <1, 25.2, 20, 'This is a book'>
+
+## Aggregation Update
+
+You can configure partial update from options:
+
+```sql
+CREATE TABLE MyTable (
+  a STRING,
+  b INT,
+  c INT,
+  PRIMARY KEY (a) NOT ENFORCED 
+) WITH (
+  'merge-engine'='aggregation'

Review Comment:
   We also need to specify what aggregate functions are used for each field.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SumAggregateFunction.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/** Custom column aggregation abstract class. */
+public interface SumAggregateFunction<T> extends Serializable {

Review Comment:
   This interface can also be used by other aggregate functions, not only sum. Make this a more generic aggregate function interface.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunctionFactory.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Aggregate Function Factory is used to get the aggregate type based on the configuration Each
+ * aggregate type has its own aggregate function factory Different implementation classes are given
+ * for different data types.
+ */
+public class AggregateFunctionFactory {
+
+    /** SumFactory. */
+    public static class SumAggregateFunctionFactory {
+        static SumAggregateFunction<?> choiceRightAggregateFunction(Class<?> c) {

Review Comment:
   Use `LogicalType` instead of `Class<?>`. See `TypeUtils` class in `flink-table-store-common` module for an example.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunctionFactory.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Aggregate Function Factory is used to get the aggregate type based on the configuration Each
+ * aggregate type has its own aggregate function factory Different implementation classes are given
+ * for different data types.
+ */
+public class AggregateFunctionFactory {
+
+    /** SumFactory. */
+    public static class SumAggregateFunctionFactory {
+        static SumAggregateFunction<?> choiceRightAggregateFunction(Class<?> c) {
+            SumAggregateFunction<?> f = null;
+            if (Double.class.equals(c)) {
+                f = new DoubleSumAggregateFunction();
+            } else if (Long.class.equals(c)) {
+                f = new LongSumAggregateFunction();
+            } else if (Integer.class.equals(c)) {
+                f = new IntegerSumAggregateFunction();
+            } else if (Float.class.equals(c)) {
+                f = new FloatSumAggregateFunction();
+            }
+            return f;
+        }
+    }
+
+    public static AggregationKind getAggregationKind(Collection<String> values) {

Review Comment:
   `Enum`s in Java has a `valueOf` method, which transform a `String` into the corresponding `Enum`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SumAggregateMergeFunction.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class SumAggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<SumAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+
+    private final Set<String> aggregateColumnNames;
+
+    public SumAggregateMergeFunction(
+            RowType primaryKeyType, RowType rowType, Set<String> aggregateColumnNames) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregateColumnNames = aggregateColumnNames;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            SumAggregateFunction<?> f = null;
+            if (aggregateColumnNames.contains(rowNames.get(i))) {
+                f =
+                        AggregateFunctionFactory.SumAggregateFunctionFactory
+                                .choiceRightAggregateFunction(
+                                        rowType.getTypeAt(i).getDefaultConversion());
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            SumAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (value.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                            f.retract(currentField);

Review Comment:
   See [my comments](https://github.com/apache/flink-table-store/pull/121#issuecomment-1132579400). I'm afraid currently we have no way to support retraction.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java:
##########
@@ -232,12 +235,29 @@ public static FileStoreImpl createWithPrimaryKey(
                 mergeFunction = new DeduplicateMergeFunction();
                 break;
             case PARTIAL_UPDATE:
-                List<LogicalType> fieldTypes = rowType.getChildren();
-                RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
-                for (int i = 0; i < fieldTypes.size(); i++) {
-                    fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+                mergeFunction = new PartialUpdateMergeFunction(rowType);
+                break;
+            case AGGREGATION:
+                Map<String, String> rightConfMap =
+                        options.getFilterConf(e -> e.getKey().endsWith(".aggregate-function"));
+                Set<String> aggregateColumnNames =
+                        rightConfMap.keySet().stream()
+                                .distinct()
+                                .flatMap(s -> Stream.of(s.split(".aggregate-function")[0]))
+                                .collect(Collectors.toSet());
+                switch (AggregateFunctionFactory.getAggregationKind(rightConfMap.values())) {
+                    case Sum:
+                        mergeFunction =
+                                new SumAggregateMergeFunction(
+                                        primaryKeyType, rowType, aggregateColumnNames);
+                        break;
+                    case Avg:
+                    case Max:
+                    case Min:
+                    default:
+                        throw new UnsupportedOperationException(
+                                "merge-function values un supposed");

Review Comment:
   Extract this to a separated factory class. We'd like each method to be simple. Maybe the outer `switch` statement should also be extracted?
   
   If you're extracting both `switch` statements I would suggest creating 2 factory classes. Each class should complete one specific task.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SumAggregateMergeFunction.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class SumAggregateMergeFunction implements MergeFunction {

Review Comment:
   Ditto. Make this into a more generic `MergeFunction`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1143175449

   ![image](https://user-images.githubusercontent.com/37119488/171341132-3e196df5-c65d-4cc8-b13a-9d8ff80b0d77.png)
   Do you know how to assert that two double (or float) values are equal? Please run AggregationITCase#testMergeRead and AggregationITCase#testMergCompaction to observe the results.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r874815823


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ITCase for partial update.
+ */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation'     );");
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2, 4));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, InterruptedException {
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4, 6));
+    }
+
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, InterruptedException {
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+        // key pk1
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await();
+
+        // key pk2
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await();
+
+        List<Row> result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1",11,12), Row.of("pk2",19,11));
+    }
+
+//    @Test

Review Comment:
   > Usually, it's not appropriate to leave commented code. If it's useless, remove it instead.
   
   This test example comes from `flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java`
   I don't know if this test makes sense for `flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java`,
   do I need to keep this test .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r872947013


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {

Review Comment:
   Should update_before and update_after be supported for Aggregation? I don't know how to handle this situation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1126717355

   > If I want to test the same MergeFunction on tables with different structures, what should I do? (Is it possible to create a new Test file again?)
   
   In that case just create multiple tables with different names in `ddl` method. Table names should be different within the same test class as they'll be created all at once (see `FileStoreTableITCase`). However different tests run in different temporary directories so it is OK to have the same table name across test classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1135419375

   > You can try this test case if your're interested.
   > 
   > ```java
   > @Test
   > public void myTest() throws Exception {
   >     String ddl3 =
   >             "CREATE TABLE IF NOT EXISTS T5 ( dt STRING, hr INT, price INT, PRIMARY KEY (dt, hr) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'price.aggregate-function' = 'sum' );";
   >     String tmpPath;
   >     try {
   >         tmpPath = TEMPORARY_FOLDER.newFolder().toURI().toString();
   >     } catch (Exception e) {
   >         throw new RuntimeException(e);
   >     }
   >     String ddl4 =
   >             "CREATE TABLE IF NOT EXISTS A ( dt STRING, hr INT, price INT ) WITH ( 'connector' = 'filesystem', 'path' = '"
   >                     + tmpPath
   >                     + "', 'format' = 'avro' );";
   >     String ddl5 =
   >             "CREATE TABLE IF NOT EXISTS P ( dt STRING, hr INT, price INT ) WITH ( 'connector' = 'print' );";
   >     bEnv.executeSql(ddl3).await();
   >     bEnv.executeSql(ddl4).await();
   >     sEnv.executeSql(ddl3).await();
   >     sEnv.executeSql(ddl4).await();
   >     sEnv.executeSql(ddl5).await();
   >     bEnv.executeSql(
   >                     "INSERT INTO A VALUES ('20220101', 8, 100), ('20220101', 8, 300), ('20220101', 8, 200), ('20220101', 8, 400), ('20220101', 9, 100)")
   >             .await();
   >     sEnv.executeSql(
   >                     "INSERT INTO T5 SELECT dt, hr, price FROM ("
   >                             + "  SELECT dt, hr, price, ROW_NUMBER() OVER (PARTITION BY dt, hr ORDER BY price desc) AS rn FROM A"
   >                             + ") WHERE rn <= 2")
   >             .await();
   >     sEnv.executeSql(
   >                     "INSERT INTO P SELECT dt, hr, price FROM ("
   >                             + "  SELECT dt, hr, price, ROW_NUMBER() OVER (PARTITION BY dt, hr ORDER BY price desc) AS rn FROM A"
   >                             + ") WHERE rn <= 2")
   >             .await();
   >     List<Row> result = iteratorToList(bEnv.from("T5").execute().collect());
   >     System.out.println(result);
   > }
   > ```
   > 
   > This SQL script calculates the sum of the top 2 largest price in each hour. I've also created a print sink so that you can see what's going into the sink.
   > 
   > ```
   > 1> +I[20220101, 8, 100]
   > 1> +I[20220101, 8, 300] # insert price 100 and 300
   > 1> -D[20220101, 8, 100]
   > 1> +I[20220101, 8, 200] # price 200 comes, so 100 should be removed out of the result
   > 1> -D[20220101, 8, 200]
   > 1> +I[20220101, 8, 400] # price 400 comes, so 200 should be removed out of the result
   > 1> +I[20220101, 9, 100] # 100 in another hour, not affected
   > ```
   > 
   > The expected result of our aggregate function should be `[+I[20220101, 8, 700], +I[20220101, 9, 100]]` but sadly current implementation prints out `[+I[20220101, 8, 1300], +I[20220101, 9, 100]]`.
   > 
   > Sorry that my previous comments on row kinds are sort of misleading. Flink does have 4 row kinds but in Table Store we only consider key-value pairs instead of rows. Key-value pairs only have two value kinds (provided by `KeyValue#valueKind`): `ValueKind.ADD` means updating the corresponding key with the value (you can think of it as `Map#compute` in Java) and `ValueKind.DELETE` means removing the corresponding key (like `Map#remove` in Java). Although each key and each value is represented by a `RowData`, their row kind are meaningless and are always `INSERT`. Only their value kinds are important.
   > 
   > To connect table store with Flink, we parse `RowData` from Flink into `KeyValue` according to both its row kind and the merge function used in `flink-table-store-connector` module. As the number of merge functions are growing we should consider extracting a common method to complete this parsing. But that is out of the scope of this PR.
   > 
   > For this PR I suggest that we only support `INSERT` row kind and leave the support for other row kind later. All in all each PR should be as small as possible and only concentrate on one thing.
   
   
   sorry i didn't understand what you meant
   Your test executes SQL
   > "Insert a value ('20220101', 8, 100), ('20220101', 8, 300), ('20220101', 8, 200), ('20220101', 8, 400), ('20220101' , 9, 100)"
   
   But I didn't find where to execute `-D[20220101, 8, 200]`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
LadyForest commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1125633950

   Hi @ajian2002, thanks for your interest in this project. TableStore follows the same code of conduct as Flink, so you can find these handy manuals are also helpful.
   
   - [Flink Contributor 速成指南](https://segmentfault.com/a/1190000025176825)
   - [Flink 社区成长篇教程](https://flink-learning.org.cn/activity/detail/aad55aed19879dc61ad42c3ed1a4fb01)
   
   These references will guide you through the entire process of how to create/apply a Jira ticket, check code style/compilation/test and follow the commit message format *, etc.*


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1128911422

   How should I configure  `ConfigOption<MergeEngine> MERGE_ENGINE `  to get  `columnName.aggregate-function = sum` from the parameter `FileStoreOptions` of `static FileStoreImpl createWithPrimaryKey` function?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1131592823

   Is there any bug? Can it be merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r885565159


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateMergeFunction.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record,
+ * aggregate specifies field on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<ColumnAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+    private final Map<String, AggregationKind> aggregationKindMap;
+
+    public AggregateMergeFunction(
+            RowType primaryKeyType,
+            RowType rowType,
+            Map<String, AggregationKind> aggregationKindMap) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregationKindMap = aggregationKindMap;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            ColumnAggregateFunction<?> f = null;
+            if (aggregationKindMap.containsKey(rowNames.get(i))) {
+                f =
+                        ColumnAggregateFunctionFactory.getColumnAggregateFunction(
+                                aggregationKindMap.get(rowNames.get(i)), rowType.getTypeAt(i));
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            ColumnAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (value.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());
+                    }
+                    Object result = f.getResult();
+                    if (result != null) {
+                        row.setField(i, result);

Review Comment:
   `AggregateMergeFunction#reset` will be called before adding the first row of a new primary key, and `AggregateMergeFunction#getValue` will be called after the last row of that primary key. See `SortMergeReader` if you're intersted.
   
   > I don't know when AggregateMergeFunction#reset and AggregateMergeFunction#getValue are called, and the breakpoint is not called when passing the test.
   
   Are you sure about this? My breakpoint at `AggregateMergeFunction#reset` and `AggregateMergeFunction#getValue` can be triggered when running `AggregationITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1139716861

   > @tsreaper 我为每一列实现了列聚合函数接口,所有不同的聚合种类都扩展了列聚合函数接口,不同的数据类型实现了对应种类的列聚合函数接口。这三层关系确保我们有足够的灵活性。
   Now we can specify different aggregate functions for different columns, but I haven't implemented any other kind than sum
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r877701051


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java:
##########
@@ -201,17 +204,28 @@ public static FileStoreImpl createWithPrimaryKey(
                                 .collect(Collectors.toList()));
 
         MergeFunction mergeFunction;
+        Map<String, String> rightConfMap =
+                options.getFilterConf(e -> e.getKey().endsWith(".aggregate-function"));

Review Comment:
   Should I also consider adding extended aggregate functions that may also need to use methods to obtain the same ConfMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871965386


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }

Review Comment:
   These `instanceof` checks will be performed on a per-record bases and have a big impact on performance. That's why we use `RowData.FieldGetter` instead of `instanceof` to fetch columns out of a row.
   
   A better approach is to create an `Aggregator` class just like `RowData.FieldGetter` and in the constructor of `AggregationMergeFunction` we choose `Aggregator` for each column according to their types. So on a per-record path we can use the `Aggregator` directly without these checks.
   
   An even better approach is to use Flink's code generation systems but that might be too complex for your first contribution, considering its usage and the fact that internal system of Flink table planner is now awkward to call from the outside due to the scala-free change in Flink 1.15. If you're interested see `AggWithoutKeysCodeGenerator` in Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r874650119


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;

Review Comment:
   > `aggregateFunctions`可能更好。
   
   do you mean  fkink-table-common/org.apache.flink.table.functions.AggregateFunction<T, ACC> ?
   Can I reuse any existing interface/implementation class? Or I need to define an aggregator interface and implementation classes for various data types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1129520834

   > How should I configure `ConfigOption<MergeEngine> MERGE_ENGINE ` to get `columnName.aggregate-function = sum` from the parameter `FileStoreOptions` of `static FileStoreImpl createWithPrimaryKey` function?
   
   You can get all config options from `FileStoreOptions.options` member. You might need to add a method to `FileStoreOptions` to fetch out a random key.
   
   ```sql
   CREATE TABLE IF NOT EXISTS T (
     j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED
   ) WITH ('merge-engine'='partial-update', 'specialkey'='hahahaha');
   ```
   
   ![20220518110552](https://user-images.githubusercontent.com/19909549/168949470-9d7e88e3-08a5-4666-b064-53b995081702.jpg)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r876524803


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T3 ( "
+                        + " a STRING, "
+                        + " b INT, "
+                        + " c INT, "
+                        + " PRIMARY KEY (a) NOT ENFORCED )"
+                        + " WITH ("
+                        + " 'merge-engine'='aggregation' ,"
+                        + " 'b.aggregate-function'='sum' ,"
+                        + " 'c.aggregate-function'='sum' "

Review Comment:
   If users don't care about the results of some columns they shouldn't include them into the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1130012599

   > > How should I configure `ConfigOption<MergeEngine> MERGE_ENGINE ` to get `columnName.aggregate-function = sum` from the parameter `FileStoreOptions` of `static FileStoreImpl createWithPrimaryKey` function?
   > 
   > You can get all config options from `FileStoreOptions.options` member. You might need to add a method to `FileStoreOptions` to fetch out a random key.
   > 
   > ```sql
   > CREATE TABLE IF NOT EXISTS T (
   >   j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED
   > ) WITH ('merge-engine'='partial-update', 'specialkey'='hahahaha');
   > ```
   > 
   > ![20220518110552](https://user-images.githubusercontent.com/19909549/168949470-9d7e88e3-08a5-4666-b064-53b995081702.jpg)
   
   yes I have implemented this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r874650119


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;

Review Comment:
   > `aggregateFunctions`可能更好。
   
   do you mean  fkink-table-common/org.apache.flink.table.functions.AggregateFunction<T, ACC> ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r872990828


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }

Review Comment:
   should it be used
   org.apache.flink.api.java.aggregation.AggregationFunction to handle aggregation? This is a convenient way to deal with different field types, but it also brings some problems. It is difficult for me to deal with delete/update RowData#getRowKind. How to solve it?
   1. Modify the original interface org.apache.flink.api.java.aggregation.AggregationFunction to add delete and update functions
   2. Implement the Aggregator class by yourself, manually switch the class of different fields and implement it?
   Is there any other solution
   For the type processing of object/class, I guess flink/flink-table-store already has useful tool classes to reduce development workload (such as org.apache.flink.api.java.aggregation.AggregationFunction, etc.), you can provide guide?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r873280827


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;

Review Comment:
   `aggregateFunctions` may be better.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(
+            RowData.FieldGetter[] fieldGetters, RowType primaryKeyType, RowType rowType) {
+        this.getters = fieldGetters;
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.primaryKeyNames = new HashSet<>(primaryKeyType.getFieldNames());
+        this.rowNames = new ArrayList<>(rowType.getFieldNames());
+        this.types = new ArrayList<>(rowType.getFieldCount());
+        AggregationFunctionFactory factory = Aggregations.SUM.getFactory();
+        for (LogicalType type : rowType.getChildren()) {
+            try {
+                AggregationFunction<Object> f =
+                        factory.createAggregationFunction(
+                                (Class<Object>) type.getDefaultConversion());
+                types.add(f);
+            } catch (UnsupportedAggregationTypeException e) {
+                types.add(null);
+            }
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            AggregationFunction<Object> f = types.get(i);
+            if (primaryKeyNames.contains(rowNames.get(i))) {

Review Comment:
   Calculate the indices of primary keys and other columns in the constructor so we don't need to search from the set here.



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T3 ( "
+                        + " a STRING, "
+                        + " b INT, "
+                        + " c INT, "
+                        + " PRIMARY KEY (a) NOT ENFORCED )"
+                        + " WITH ("
+                        + " 'merge-engine'='aggregation' ,"
+                        + " 'b.aggregate-function'='sum' ,"
+                        + " 'c.aggregate-function'='sum' "

Review Comment:
   What happens if I only set `'b.aggregate-function'='sum'`? Maybe an exception telling the user that they should set aggregate function for every column not part of primary key? Implement this and add this test case.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(
+            RowData.FieldGetter[] fieldGetters, RowType primaryKeyType, RowType rowType) {

Review Comment:
   Also create `FieldGetter` in the constructor to decrease the number of arguments?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(
+            RowData.FieldGetter[] fieldGetters, RowType primaryKeyType, RowType rowType) {
+        this.getters = fieldGetters;
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.primaryKeyNames = new HashSet<>(primaryKeyType.getFieldNames());
+        this.rowNames = new ArrayList<>(rowType.getFieldNames());
+        this.types = new ArrayList<>(rowType.getFieldCount());
+        AggregationFunctionFactory factory = Aggregations.SUM.getFactory();

Review Comment:
   Nice find. I don't even know we have such a class in Flink.
   
   However this class is marked as `@Internal` in Flink which means it is not a public API. Also this class does not support retraction and other aggregate functions such as `avg`. I'm not sure if there are better ways to do this or if it'll be better to create our own aggregate function classes. I'll leave this to the other reviewers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r877701752


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>
+ */
+public interface AggregateFunction<T> extends Serializable {
+    //     T aggregator;
+
+    T getResult();
+
+    default void init() {
+        reset();
+    }
+
+    void reset();
+
+    default void aggregate(Object value) {
+        aggregate(value, true);
+    }
+
+    void aggregate(Object value, boolean add);
+
+    void reset(Object value);

Review Comment:
   We should consider keeping some common methods and prepare for future features



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1132373009

   So the problem is solved?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r884370488


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+
+        String ddl1 =
+                "CREATE TABLE IF NOT EXISTS T3 ( "
+                        + " a STRING, "
+                        + " b BIGINT, "
+                        + " c INT, "
+                        + " PRIMARY KEY (a) NOT ENFORCED )"
+                        + " WITH ("
+                        + " 'merge-engine'='aggregation' ,"
+                        + " 'b.aggregate-function'='sum' ,"
+                        + " 'c.aggregate-function'='sum' "
+                        + " );";
+        String ddl2 =
+                "CREATE TABLE IF NOT EXISTS T4 ( "
+                        + " a STRING,"
+                        + " b INT,"
+                        + " c DOUBLE,"
+                        + " PRIMARY KEY (a, b) NOT ENFORCED )"
+                        + " WITH ("
+                        + " 'merge-engine'='aggregation',"
+                        + " 'c.aggregate-function' = 'sum'"
+                        + " );";
+        String ddl3 =
+                "CREATE TABLE IF NOT EXISTS T5 ( "
+                        + " a STRING,"
+                        + " b INT,"
+                        + " c DOUBLE,"
+                        + " PRIMARY KEY (a) NOT ENFORCED )"
+                        + " WITH ("
+                        + " 'merge-engine'='aggregation',"
+                        + " 'b.aggregate-function' = 'sum'"
+                        + " );";
+        List<String> lists = new ArrayList<>();
+        lists.add(ddl1);
+        lists.add(ddl2);
+        lists.add(ddl3);
+        return lists;
+    }
+
+    @Test
+    public void testCreateAggregateFunction() throws ExecutionException, InterruptedException {
+        List<Row> result;
+
+        // T5
+        try {
+            bEnv.executeSql("INSERT INTO T5 VALUES " + "('pk1',1, 2.0), " + "('pk1',1, 2.0)")
+                    .await();
+            throw new AssertionError("create table T5 should failed");
+        } catch (IllegalArgumentException e) {
+            assert ("should  set aggregate function for every column not part of primary key"
+                    .equals(e.getLocalizedMessage()));
+        }
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, InterruptedException {
+        List<Row> result;
+        // T3
+        bEnv.executeSql("INSERT INTO T3 VALUES " + "('pk1',1, 2), " + "('pk1',1, 2)").await();
+        result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 2L, 4));
+
+        // T4
+        bEnv.executeSql("INSERT INTO T4 VALUES " + "('pk1',1, 2.0), " + "('pk1',1, 2.0)").await();
+        result = iteratorToList(bEnv.from("T4").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 1, 4.0));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, InterruptedException {
+        List<Row> result;
+        // T3
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 2)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',1, 4)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1',2, 0)").await();
+        result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 4L, 6));
+
+        // T4
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk1',1, 2.0)").await();
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk1',1, 4.0)").await();
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk1',1, 0.0)").await();
+        result = iteratorToList(bEnv.from("T4").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of("pk1", 1, 6.0));
+    }
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, InterruptedException {
+        List<Row> result;
+
+        // T3
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T3 SET ('commit.force-compact'='true')");
+
+        // key pk1
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 3, 1)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 5)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk1', 4, 6)").await();
+
+        // key pk2
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 6,7)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 9,0)").await();
+        bEnv.executeSql("INSERT INTO T3 VALUES ('pk2', 4,4)").await();
+
+        result = iteratorToList(bEnv.from("T3").execute().collect());
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of("pk1", 11L, 12), Row.of("pk2", 19L, 11));
+
+        // T4
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T4 SET ('commit.force-compact'='true')");
+
+        // key pk1_3
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk1', 3, 1.0)").await();
+        // key pk1_4
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk1', 4, 5.0)").await();
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk1', 4, 6.0)").await();
+        // key pk2_4
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk2', 4,4.0)").await();
+        // key pk2_2
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk2', 2,7.0)").await();
+        bEnv.executeSql("INSERT INTO T4 VALUES ('pk2', 2,0)").await();
+
+        result = iteratorToList(bEnv.from("T4").execute().collect());
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of("pk1", 3, 1.0),
+                        Row.of("pk1", 4, 11.0),
+                        Row.of("pk2", 4, 4.0),
+                        Row.of("pk2", 2, 7.0));
+    }
+
+    @Test
+    public void myTest() throws Exception {

Review Comment:
   This is not a valid test.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ColumnAggregateFunctionFactory.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** Factory for creating {@link ColumnAggregateFunction}s. */
+public class ColumnAggregateFunctionFactory {
+    /**
+     * Determine the column aggregation function .
+     *
+     * @param kind the kind of aggregation
+     * @param typeAt the type of the column
+     * @return the column aggregation function
+     */
+    public static ColumnAggregateFunction<?> getColumnAggregateFunction(
+            AggregationKind kind, LogicalType typeAt) {
+        switch (kind) {
+            case Sum:
+                return SumColumnAggregateFunctionFactory.getColumnAggregateFunction(typeAt);
+            case Avg:
+                return AvgColumnAggregateFunctionFactory.getColumnAggregateFunction(typeAt);
+            case Max:
+                return MaxColumnAggregateFunctionFactory.getColumnAggregateFunction(typeAt);
+            case Min:
+                return MinColumnAggregateFunctionFactory.getColumnAggregateFunction(typeAt);
+            default:
+                throw new IllegalArgumentException("Aggregation kind " + kind + " not supported");
+        }
+    }
+
+    /** AggregateKind is Sum. Determine the column aggregation function . */
+    private static class SumColumnAggregateFunctionFactory {
+        static SumColumnAggregateFunction<?> getColumnAggregateFunction(LogicalType type) {
+            switch (type.getTypeRoot()) {
+                case CHAR:
+                case VARCHAR:
+                case BOOLEAN:
+                case BINARY:
+                case VARBINARY:
+                case DECIMAL:
+                case TINYINT:
+                case SMALLINT:

Review Comment:
   These types can also be supported.
   
   The internal class for `DECIMAL` is `DecimalData`. There are methods like `add` and `compare` in `DecimalDataUtils` class.
   
   The internal class for `TINYINT` is `byte`. For `SMALLINT` that is `short`. You can support them easily.



##########
docs/content/docs/development/create-table.md:
##########
@@ -268,3 +268,45 @@ For example, the inputs:
 
 Output: 
 - <1, 25.2, 20, 'This is a book'>
+
+## Aggregation Update
+
+You can configure partial update from options:

Review Comment:
   > partial update from options
   
   I guess you're copying this from the partial update merge engine. Change this description.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationKind.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.util.Locale;
+
+/** Aggregate kinds. */
+public enum AggregationKind {
+    Sum,
+    Max,
+    Min,
+    Avg;

Review Comment:
   Enum values should be upper-cased letters. Not sure if checkstyle will check this, but in Flink's code base this seems to be a convention.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateMergeFunction.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record,
+ * aggregate specifies field on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<ColumnAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+    private final Map<String, AggregationKind> aggregationKindMap;
+
+    public AggregateMergeFunction(
+            RowType primaryKeyType,
+            RowType rowType,
+            Map<String, AggregationKind> aggregationKindMap) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregationKindMap = aggregationKindMap;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            ColumnAggregateFunction<?> f = null;
+            if (aggregationKindMap.containsKey(rowNames.get(i))) {
+                f =
+                        ColumnAggregateFunctionFactory.getColumnAggregateFunction(
+                                aggregationKindMap.get(rowNames.get(i)), rowType.getTypeAt(i));
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            ColumnAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (value.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());
+                    }
+                    Object result = f.getResult();
+                    if (result != null) {
+                        row.setField(i, result);

Review Comment:
   Value of `row` is inspected only when `AggregateMergeFunction#getValue` is called. You've stored aggregated results in `f`, why resetting it every time and aggregate twice?
   
   My suggestion:
   * In `AggregateMergeFunction#reset`, reset every aggregate function.
   * In `AggregateMergeFunction#add`, aggregate each field into the corresponding function. Current results are now stored in the function.
   * In `AggregateMergeFunction#getValue`, move aggregated results from functions into the row.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationKind.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.util.Locale;
+
+/** Aggregate kinds. */
+public enum AggregationKind {
+    Sum,
+    Max,
+    Min,
+    Avg;
+
+    public static AggregationKind fromString(String name) {
+        switch (name.toLowerCase(Locale.ROOT)) {
+            case "sum":
+                return Sum;
+            case "max":
+                return Max;
+            case "min":
+                return Min;
+            case "avg":
+                return Avg;
+            default:
+                throw new IllegalArgumentException("Unknown aggregation kind: " + name);
+        }
+    }

Review Comment:
   No need for this method. Callers can call `AggregateKind.valueOf(name.toUpperCase())` directly.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateMergeFunction.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record,
+ * aggregate specifies field on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<ColumnAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+    private final Map<String, AggregationKind> aggregationKindMap;
+
+    public AggregateMergeFunction(
+            RowType primaryKeyType,
+            RowType rowType,
+            Map<String, AggregationKind> aggregationKindMap) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregationKindMap = aggregationKindMap;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            ColumnAggregateFunction<?> f = null;
+            if (aggregationKindMap.containsKey(rowNames.get(i))) {
+                f =
+                        ColumnAggregateFunctionFactory.getColumnAggregateFunction(
+                                aggregationKindMap.get(rowNames.get(i)), rowType.getTypeAt(i));
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            ColumnAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {

Review Comment:
   `f` should never be null in this case. Use `Preconditions.checkNotNull(f, "Aggregate function should never be null. This is unexpected.")` if you need to check this.



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {

Review Comment:
   You now support other aggregate functions like `min` and `max`. Add tests for them. Remember any changes you make must come with a test.
   
   You also support different aggregate functions for different columns. Also add a test case for this.



##########
docs/content/docs/development/create-table.md:
##########
@@ -268,3 +268,45 @@ For example, the inputs:
 
 Output: 
 - <1, 25.2, 20, 'This is a book'>
+
+## Aggregation Update
+
+You can configure partial update from options:
+
+```sql
+CREATE TABLE MyTable (
+  a STRING,
+  b INT,
+  c INT,
+  PRIMARY KEY (a) NOT ENFORCED 
+) WITH (
+      'merge-engine' = 'aggregation',
+      'b.aggregate-function' = 'sum',
+      'c.aggregate-function' = 'sum'
+);
+```
+{{< hint info >}}
+__Note:__Aggregate updates are only supported for tables with primary keys.
+{{< /hint >}}
+
+{{< hint info >}}
+__Note:__Aggregate updates do not support streaming consumption.

Review Comment:
   Add a test case for streaming consumption. This test should throw an exception telling the user that we don't support this.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateMergeFunction.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record,
+ * aggregate specifies field on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<ColumnAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+    private final Map<String, AggregationKind> aggregationKindMap;
+
+    public AggregateMergeFunction(
+            RowType primaryKeyType,
+            RowType rowType,
+            Map<String, AggregationKind> aggregationKindMap) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregationKindMap = aggregationKindMap;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            ColumnAggregateFunction<?> f = null;
+            if (aggregationKindMap.containsKey(rowNames.get(i))) {
+                f =
+                        ColumnAggregateFunctionFactory.getColumnAggregateFunction(
+                                aggregationKindMap.get(rowNames.get(i)), rowType.getTypeAt(i));
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            ColumnAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (value.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());
+                    }
+                    Object result = f.getResult();
+                    if (result != null) {

Review Comment:
   In SQL, if one of the input value of some (well, most of, except `count`) aggregate function is null then the final result will be null. No need to filter out null values but you may need to deal with null values in your aggregate functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1139717042

   > @tsreaper 我为每一列的功能,不同的聚合类型都扩展了列功能接口,不同的数据类型实现了不同类型的列接口功能。这种实现关系确保我们有所有的列聚合接口。
   
   Now we can specify different aggregate functions for different columns, but I haven't implemented any other kind than sum


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r874650119


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.api.java.aggregation.AggregationFunction;
+import org.apache.flink.api.java.aggregation.AggregationFunctionFactory;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+    private final RowType primaryKeyType;
+    private final RowType rowType;
+    private final Set<String> primaryKeyNames;
+    private final ArrayList<String> rowNames;
+
+    private final ArrayList<AggregationFunction<Object>> types;

Review Comment:
   > `aggregateFunctions`可能更好。
   
   do you mean  fkink-table-common/org.apache.flink.table.functions.AggregateFunction<T, ACC> ?
   Can I reuse any existing interface/implementation class? Or I need to define an aggregator interface and implementation classes for various data types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r875896707


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for partial update. */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T3 ( "
+                        + " a STRING, "
+                        + " b INT, "
+                        + " c INT, "
+                        + " PRIMARY KEY (a) NOT ENFORCED )"
+                        + " WITH ("
+                        + " 'merge-engine'='aggregation' ,"
+                        + " 'b.aggregate-function'='sum' ,"
+                        + " 'c.aggregate-function'='sum' "

Review Comment:
   > What happens if I only set `'b.aggregate-function'='sum'`? Maybe an exception telling the user that they should set aggregate function for every column not part of primary key? Implement this and add this test case.
   
   I don't think it should throw an exception. Users sometimes don't care about the aggregation results of other non-primary key columns. In the latest version of the implementation, not setting `c.aggregate-function=sum` will make the final result c column null`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r877680927


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<AggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+
+    private final Set<String> aggregateColumnNames;
+
+    public AggregationMergeFunction(
+            RowType primaryKeyType, RowType rowType, Set<String> aggregateColumnNames) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregateColumnNames = aggregateColumnNames;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        Arrays.fill(isPrimaryKey, false);
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            AggregateFunction<?> f = null;
+            if (aggregateColumnNames.contains(rowNames.get(i))) {
+                f = choiceRightAggregateFunction(rowType.getTypeAt(i).getDefaultConversion());
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    private AggregateFunction<?> choiceRightAggregateFunction(Class<?> c) {
+        AggregateFunction<?> f = null;
+        if (Double.class.equals(c)) {
+            f = new DoubleAggregateFunction();
+        } else if (Long.class.equals(c)) {
+            f = new LongAggregateFunction();
+        } else if (Integer.class.equals(c)) {
+            f = new IntegerAggregateFunction();
+        } else if (Float.class.equals(c)) {
+            f = new FloatAggregateFunction();
+        }
+        return f;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            AggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (row.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                            f.aggregate(currentField, false);
+                            break;
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());

Review Comment:
   You can treat `UPDATE_BEFORE` as `DELETE` and `UPDATE_AFTER` as `INSERT`. They're actually different in some other ways but in our scenario they're the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r877715115


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import java.io.Serializable;
+
+/**
+ * 自定义的列聚合抽象类.
+ *
+ * @param <T>
+ */
+public interface AggregateFunction<T> extends Serializable {
+    //     T aggregator;
+
+    T getResult();
+
+    default void init() {
+        reset();
+    }
+
+    void reset();
+
+    default void aggregate(Object value) {
+        aggregate(value, true);
+    }
+
+    void aggregate(Object value, boolean add);
+
+    void reset(Object value);

Review Comment:
   Oops, I meant to comment under the `init` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#issuecomment-1139714404

   @tsreaper  I implement the column aggregation function interface for each column, all the different aggregation kinds extend the column aggregation function interface, and different data types implement the column aggregation function interface of the corresponding kind. These three layers of relationships ensure that we have enough flexibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] ajian2002 commented on a diff in pull request #121: [FLINK-27626] Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
ajian2002 commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r885242104


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregateMergeFunction.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record,
+ * aggregate specifies field on merge.
+ */
+@SuppressWarnings("checkstyle:RegexpSingleline")
+public class AggregateMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private final RowType rowType;
+    private final ArrayList<ColumnAggregateFunction<?>> aggregateFunctions;
+    private final boolean[] isPrimaryKey;
+    private final RowType primaryKeyType;
+    private transient GenericRowData row;
+    private final Map<String, AggregationKind> aggregationKindMap;
+
+    public AggregateMergeFunction(
+            RowType primaryKeyType,
+            RowType rowType,
+            Map<String, AggregationKind> aggregationKindMap) {
+        this.primaryKeyType = primaryKeyType;
+        this.rowType = rowType;
+        this.aggregationKindMap = aggregationKindMap;
+
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        this.getters = new RowData.FieldGetter[fieldTypes.size()];
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            getters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+        }
+
+        this.isPrimaryKey = new boolean[this.getters.length];
+        List<String> rowNames = rowType.getFieldNames();
+        for (String primaryKeyName : primaryKeyType.getFieldNames()) {
+            isPrimaryKey[rowNames.indexOf(primaryKeyName)] = true;
+        }
+
+        this.aggregateFunctions = new ArrayList<>(rowType.getFieldCount());
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            ColumnAggregateFunction<?> f = null;
+            if (aggregationKindMap.containsKey(rowNames.get(i))) {
+                f =
+                        ColumnAggregateFunctionFactory.getColumnAggregateFunction(
+                                aggregationKindMap.get(rowNames.get(i)), rowType.getTypeAt(i));
+            } else {
+                if (!isPrimaryKey[i]) {
+                    throw new IllegalArgumentException(
+                            "should  set aggregate function for every column not part of primary key");
+                }
+            }
+            aggregateFunctions.add(f);
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++) {
+            Object currentField = getters[i].getFieldOrNull(value);
+            ColumnAggregateFunction<?> f = aggregateFunctions.get(i);
+            if (isPrimaryKey[i]) {
+                // primary key
+                if (currentField != null) {
+                    row.setField(i, currentField);
+                }
+            } else {
+                if (f != null) {
+                    f.reset();
+                    Object oldValue = row.getField(i);
+                    if (oldValue != null) {
+                        f.aggregate(oldValue);
+                    }
+                    switch (value.getRowKind()) {
+                        case INSERT:
+                            f.aggregate(currentField);
+                            break;
+                        case DELETE:
+                        case UPDATE_AFTER:
+                        case UPDATE_BEFORE:
+                        default:
+                            throw new UnsupportedOperationException(
+                                    "Unsupported row kind: " + row.getRowKind());
+                    }
+                    Object result = f.getResult();
+                    if (result != null) {
+                        row.setField(i, result);

Review Comment:
   > 的值`row`仅在被调用时才被检查`AggregateMergeFunction#getValue`。您已将汇总结果存储在 中`f`,为什么每次都重置它并汇总两次?
   > 
   > 我的建议:
   > 
   > * 在`AggregateMergeFunction#reset`中,重置每个聚合函数。
   > * 在`AggregateMergeFunction#add`中,将每个字段聚合到相应的函数中。当前结果现在存储在函数中。
   > * 在`AggregateMergeFunction#getValue`中,将聚合结果从函数移到行中。
   
   My aggregation function f belongs to each column. If the result is not stored in row and reset, the aggregation result of another primary key on the same column will be wrong the next time I add it.
   Can your method solve this problem?
   
   I don't know when AggregateMergeFunction#reset and AggregateMergeFunction#getValue are called, and the breakpoint is not called when passing the test. Let me know if I'm sure it's okay to do this and I'll start coding



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871965386


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }

Review Comment:
   These `instanceof` checks will be performed on a per-record bases and have a big impact on performance. That's why we use `RowData.FieldGetter` instead of `instanceof` to fetch columns out of a row.
   
   A better approach is to create an `Aggregator` class just like `RowData.FieldGetter` and in the constructor of `AggregationMergeFunction` we choose `Aggregator` for each column according to their types. So on a per-record path we can use the `Aggregator` directly without these checks.
   
   An even better approach is to use Flink's existing code generation systems but that might be too complex for your first contribution, considering its usage and the fact that internal system of Flink table planner is now awkward to call from the outside due to the scala-free change in Flink 1.15. If you're interested see `AggWithoutKeysCodeGenerator` in Flink.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java:
##########
@@ -196,18 +197,24 @@ public static FileStoreImpl createWithPrimaryKey(
                                 .collect(Collectors.toList()));
 
         MergeFunction mergeFunction;
+        List<LogicalType> fieldTypes = rowType.getChildren();
+        RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
         switch (mergeEngine) {
             case DEDUPLICATE:
                 mergeFunction = new DeduplicateMergeFunction();
                 break;
             case PARTIAL_UPDATE:
-                List<LogicalType> fieldTypes = rowType.getChildren();
-                RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
                 for (int i = 0; i < fieldTypes.size(); i++) {
                     fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
                 }
                 mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
                 break;
+            case AGGREGATION:
+                for (int i = 0; i < fieldTypes.size(); i++) {
+                    fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
+                }

Review Comment:
   Move this out of the `switch` statement as other branches also need this.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }
+        else if (oldValue instanceof Double && currentField instanceof Double)
+        {
+            return Double.sum((Double) oldValue, (Double) currentField);
+        }
+        else if (oldValue instanceof Float && currentField instanceof Float)
+        {
+            return Float.sum((Float) oldValue, (Float) currentField);
+        }
+        else if (oldValue instanceof String && currentField instanceof String)

Review Comment:
   Flink has its own internal representation for some of the data types (mainly for performance). For example the internal representation of `String` in Flink is `StringData`. Not sure if there is a document for this. Maybe @JingsongLi will know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] tsreaper commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871987331


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {
+        for (int i = 0; i < getters.length; i++)
+        {
+            Object currentField = getters[i].getFieldOrNull(value);
+            Object oldValue = row.getField(i);
+            Object result = sum(oldValue, currentField);
+            if (result != null)
+            {
+                row.setField(i, result);
+            }
+        }
+    }
+
+    private Object sum(Object oldValue, Object currentField) {
+        if (currentField == null)
+        {
+            return null;
+        }
+        if (oldValue == null)
+        {
+            return currentField;
+        }
+        if (oldValue instanceof Integer && currentField instanceof Integer)
+        {
+            return Integer.sum((Integer) oldValue, (Integer) currentField);
+        }
+        else if (oldValue instanceof Long && currentField instanceof Long)
+        {
+            return Long.sum((Long) oldValue, (Long) currentField);
+        }
+        else if (oldValue instanceof Double && currentField instanceof Double)
+        {
+            return Double.sum((Double) oldValue, (Double) currentField);
+        }
+        else if (oldValue instanceof Float && currentField instanceof Float)
+        {
+            return Float.sum((Float) oldValue, (Float) currentField);
+        }
+        else if (oldValue instanceof String && currentField instanceof String)
+        {
+            return "null";
+        }
+        return null;

Review Comment:
   Throws exception instead of returning null for types which do not support summing. It is better to explicitly tell the user what we do not support instead of quietly do something they might not expect. Also it would be better to perform this check in the constructor.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AggregationMergeFunction.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update
+ * non-null fields on merge.
+ */
+public class AggregationMergeFunction implements MergeFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public AggregationMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
+
+    @Override
+    public void reset() {
+        this.row = new GenericRowData(getters.length);
+    }
+
+    @Override
+    public void add(RowData value) {

Review Comment:
   `RowData` in Flink actually have 4 kinds (insert, update_before, update_after and delete), you can get its kind with `RowData#getRowKind`. Current implementation does not support retracting values (for example, some streaming SQL jobs may insert ('pk1', 1), ('pk1', 2) and then retract ('pk1', 1), the result should be ('pk1', 1 + 2 - 1 = 2)) so it only supports insert kind. Check row kind here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #121: Introduce AggregatuibMergeFunction

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #121:
URL: https://github.com/apache/flink-table-store/pull/121#discussion_r871983265


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AggregationITCase.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * ITCase for partial update.
+ */
+public class AggregationITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T3 (" + "a STRING," + "b INT," + "c INT ," + "PRIMARY KEY (a) NOT ENFORCED )" + " WITH ('merge-engine'='aggregation'     );");

Review Comment:
   The code format is incorrect, and you can run `mvn spotless:apply` to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org