You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/26 12:38:56 UTC

[GitHub] [flink] beyond1920 opened a new pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

beyond1920 opened a new pull request #17571:
URL: https://github.com/apache/flink/pull/17571


   ## What is the purpose of the change
   
   This pull request aims to implement streaming row time window deduplicate operator
   
   
   ## Brief change log
   
     - Add `RowTimeDeduplicateRecordsCombiner` which stores the first/last records of incremental input records into the window state
     - Add `RowTimeWindowDeduplicateProcessor` for rowtime window deduplicate.
     - Add `RowTimeWindowDeduplicateOperatorBuilder` to build a SlicingWindowOperator for row time window deduplicate.
   
   
   ## Verifying this change
   
     - Add harness test `RowTimeWindowDeduplicateOperatorTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "954862143",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "8a58dbafe5b759ff4b39c22cfd982f795f2d7044",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25743",
       "triggerID" : "8a58dbafe5b759ff4b39c22cfd982f795f2d7044",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   * 8a58dbafe5b759ff4b39c22cfd982f795f2d7044 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25743) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "954862143",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-958618670


   Minor: @beyond1920 We don't need add "blink" in the commit message, blink planner is the only planner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-953555889


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17571:
URL: https://github.com/apache/flink/pull/17571


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740950486



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       Maybe it's better to keep the original name because I could not get better name yet. `ShouldUpdate` may misleading we should sent update result to downstream.
   Besides, it's a big deal because it's an internal method. WDYT?

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);

Review comment:
       Yes, as the name says, the operator only works for event time.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       Maybe it's better to keep the original name because I could not get better name yet. `ShouldUpdate` may misleading we should sent update result to downstream.
   Besides, it's not a big deal because it's an internal method. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-953455966


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-954862143


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740698131



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);

Review comment:
       Is this operator for event time only? Other combiners like `TopNRecordsCombiner` will check for event time or processing time before registering the timer.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       nit: This `isDuplicate` method name is sort of misleading. It seems like if it returns true we should not update the result. I'd like to propose a name like `shouldUpdate` or such.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951899666


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 62c680fc5948737a1cfce1730886f54501deff84 (Tue Oct 26 12:41:22 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740950486



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       Maybe it's better to keep the original name because I could not get better name yet. `ShouldUpdate` may misleading we should sent update result to downstream.
   Besides, it's a big deal because it's an internal method. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740950486



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       Maybe it's better to keep the original name because I could not get better name yet. `ShouldUpdate` may misleading we should sent update result to downstream.
   Besides, it's a big deal because it's an internal method. WDYT?

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);

Review comment:
       Yes, as the name says, the operator only works for event time.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       Maybe it's better to keep the original name because I could not get better name yet. `ShouldUpdate` may misleading we should sent update result to downstream.
   Besides, it's not a big deal because it's an internal method. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-958618670


   Minor: @beyond1920 We don't need add "blink" in the commit message, blink planner is the only planner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17571:
URL: https://github.com/apache/flink/pull/17571


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-958618670


   Minor: @beyond1920 We don't need add "blink" in the commit message, blink planner is the only planner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "954862143",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "8a58dbafe5b759ff4b39c22cfd982f795f2d7044",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25743",
       "triggerID" : "8a58dbafe5b759ff4b39c22cfd982f795f2d7044",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8a58dbafe5b759ff4b39c22cfd982f795f2d7044 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25743) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   * a49b4c966901159870a7f0014b2f4520fc35425e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "954862143",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740950808



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);

Review comment:
       Yes, as the name says, the operator only works for event time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17571:
URL: https://github.com/apache/flink/pull/17571


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "954862143",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953555889",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a49b4c966901159870a7f0014b2f4520fc35425e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629",
       "triggerID" : "954862143",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "8a58dbafe5b759ff4b39c22cfd982f795f2d7044",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8a58dbafe5b759ff4b39c22cfd982f795f2d7044",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a49b4c966901159870a7f0014b2f4520fc35425e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25629) 
   * 8a58dbafe5b759ff4b39c22cfd982f795f2d7044 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740698131



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);

Review comment:
       Is this operator for event time only? Other combiners like `TopNRecordsCombiner` will check for event time or processing time before registering the timer.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       nit: This `isDuplicate` method name is sort of misleading. It seems like if it returns true we should not update the result. I'd like to propose a name like `shouldUpdate` or such.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-958618670


   Minor: @beyond1920 We don't need add "blink" in the commit message, blink planner is the only planner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17571:
URL: https://github.com/apache/flink/pull/17571#issuecomment-951900326


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "62c680fc5948737a1cfce1730886f54501deff84",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62c680fc5948737a1cfce1730886f54501deff84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470",
       "triggerID" : "953455966",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 62c680fc5948737a1cfce1730886f54501deff84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] beyond1920 commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740950486



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       Maybe it's better to keep the original name because I could not get better name yet. `ShouldUpdate` may misleading we should sent update result to downstream.
   Besides, it's not a big deal because it's an internal method. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17571:
URL: https://github.com/apache/flink/pull/17571#discussion_r740698131



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {
+                // the incoming record is reused, we should copy it
+                bufferedResult = recordSerializer.copy(record);
+            }
+        }
+        if (bufferedResult == null) {
+            return;
+        }
+        // step 2: flush data into state
+        keyContext.setCurrentKey(windowKey.getKey());
+        Long window = windowKey.getWindow();
+        RowData preRow = dataState.value(window);
+        if (isDuplicate(preRow, bufferedResult, rowtimeIndex, keepLastRow)) {
+            dataState.update(window, bufferedResult);
+        }
+        // step 3: register timer for current window
+        timerService.registerEventTimeWindowTimer(window);

Review comment:
       Is this operator for event time only? Other combiners like `TopNRecordsCombiner` will check for event time or processing time before registering the timer.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/window/combines/RowTimeDeduplicateRecordsCombiner.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate.window.combines;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.combines.RecordsCombiner;
+import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
+import org.apache.flink.table.runtime.operators.window.state.StateKeyContext;
+import org.apache.flink.table.runtime.operators.window.state.WindowState;
+import org.apache.flink.table.runtime.operators.window.state.WindowValueState;
+import org.apache.flink.table.runtime.util.WindowKey;
+
+import java.util.Iterator;
+
+import static org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg;
+import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.isDuplicate;
+
+/**
+ * An implementation of {@link RecordsCombiner} that stores the first/last records of incremental
+ * input records into the window state.
+ */
+public final class RowTimeDeduplicateRecordsCombiner implements RecordsCombiner {
+
+    /** The service to register event-time or processing-time timers. */
+    private final WindowTimerService<Long> timerService;
+
+    /** Context to switch current key for states. */
+    private final StateKeyContext keyContext;
+
+    /** The state stores first/last record of each window. */
+    private final WindowValueState<Long> dataState;
+
+    private final int rowtimeIndex;
+
+    private final boolean keepLastRow;
+
+    /** Serializer to copy record if required. */
+    private final TypeSerializer<RowData> recordSerializer;
+
+    public RowTimeDeduplicateRecordsCombiner(
+            WindowTimerService<Long> timerService,
+            StateKeyContext keyContext,
+            WindowValueState<Long> dataState,
+            int rowtimeIndex,
+            boolean keepLastRow,
+            TypeSerializer<RowData> recordSerializer) {
+        this.timerService = timerService;
+        this.keyContext = keyContext;
+        this.dataState = dataState;
+        this.rowtimeIndex = rowtimeIndex;
+        this.keepLastRow = keepLastRow;
+        this.recordSerializer = recordSerializer;
+    }
+
+    @Override
+    public void combine(WindowKey windowKey, Iterator<RowData> records) throws Exception {
+        // step 1: get first/last record of incremental data
+        RowData bufferedResult = null;
+        while (records.hasNext()) {
+            RowData record = records.next();
+            if (!isAccumulateMsg(record)) {
+                throw new UnsupportedOperationException(
+                        "Window deduplicate does not support input RowKind: "
+                                + record.getRowKind().shortString());
+            }
+            if (isDuplicate(bufferedResult, record, rowtimeIndex, keepLastRow)) {

Review comment:
       nit: This `isDuplicate` method name is sort of misleading. It seems like if it returns true we should not update the result. I'd like to propose a name like `shouldUpdate` or such.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17571: [FLINK-23015][table-runtime-blink] Implement streaming window Deduplicate operator

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17571:
URL: https://github.com/apache/flink/pull/17571


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org