You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 09:59:37 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request, #248: [FLINK-28712] Default Changelog all when changelog producer is input

JingsongLi opened a new pull request, #248:
URL: https://github.com/apache/flink-table-store/pull/248

   When changelog producer is input, It is implied that the file already contains all the changelogs
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #248:
URL: https://github.com/apache/flink-table-store/pull/248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #248:
URL: https://github.com/apache/flink-table-store/pull/248#discussion_r931754507


##########
docs/content/docs/development/streaming-query.md:
##########
@@ -75,17 +75,39 @@ streaming mode. This mode has a lower cost compared to Kafka but has a higher la
 depending on the checkpoint interval of the writing job.
 
 By default, the downstream streaming consumption is disordered (ordered within the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove downstream
+changelog normalized operator (which is costly), you can configure it as follows
+(Recommended, but this requires that your input is inclusive of all changelogs):
 
 ```sql
 CREATE TABLE T (...)
 WITH (
-    'changelog-producer' = 'input',
-    'log.changelog-mode' = 'all'
+    'changelog-producer' = 'input'
 )
 ```
 
+You can understand the difference between changelog-producer(none) and changelog-producer(input) by the following pictures:
+
+{{< img src="/img/changelog-producer-none.png">}}
+
+When the changelog-producer is none, because the storage only retains the upsert data and
+does not have the full changelog data containing update_before, so the downstream consumption job needs
+to use the normalized node to generate the complete changelog.
+
+{{< hint info >}}
+__Note:__ The normalized node needs to persist all the data into the state, which is very costly.
+{{< /hint >}}
+
+{{< img src="/img/changelog-producer-input.png">}}
+
+When the changelog-producer is input, the storage trust input data is a complete changelog and

Review Comment:
   ```suggestion
   When the changelog-producer is `INPUT`, the storage trusts input data is saved as a complete changelog so that ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #248:
URL: https://github.com/apache/flink-table-store/pull/248#discussion_r931142441


##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for changelog mode with flink source and sink. */
+public class ChangelogModeTest {
+
+    @TempDir java.nio.file.Path temp;
+
+    private final ObjectIdentifier identifier = ObjectIdentifier.of("c", "d", "t");
+
+    private Path path;
+
+    @BeforeEach
+    public void beforeEach() {
+        path = new Path(temp.toUri().toString());
+    }
+
+    private void test(Configuration options, ChangelogMode expectSource, ChangelogMode expectSink)
+            throws Exception {
+        test(options, expectSource, expectSink, null);
+    }
+
+    private void test(
+            Configuration options,
+            ChangelogMode expectSource,
+            ChangelogMode expectSink,
+            LogStoreTableFactory logStoreTableFactory)

Review Comment:
   Nit: add a `Nullable` annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #248:
URL: https://github.com/apache/flink-table-store/pull/248#discussion_r931754507


##########
docs/content/docs/development/streaming-query.md:
##########
@@ -75,17 +75,39 @@ streaming mode. This mode has a lower cost compared to Kafka but has a higher la
 depending on the checkpoint interval of the writing job.
 
 By default, the downstream streaming consumption is disordered (ordered within the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove downstream
+changelog normalized operator (which is costly), you can configure it as follows
+(Recommended, but this requires that your input is inclusive of all changelogs):
 
 ```sql
 CREATE TABLE T (...)
 WITH (
-    'changelog-producer' = 'input',
-    'log.changelog-mode' = 'all'
+    'changelog-producer' = 'input'
 )
 ```
 
+You can understand the difference between changelog-producer(none) and changelog-producer(input) by the following pictures:
+
+{{< img src="/img/changelog-producer-none.png">}}
+
+When the changelog-producer is none, because the storage only retains the upsert data and
+does not have the full changelog data containing update_before, so the downstream consumption job needs
+to use the normalized node to generate the complete changelog.
+
+{{< hint info >}}
+__Note:__ The normalized node needs to persist all the data into the state, which is very costly.
+{{< /hint >}}
+
+{{< img src="/img/changelog-producer-input.png">}}
+
+When the changelog-producer is input, the storage trust input data is a complete changelog and

Review Comment:
   ```suggestion
   When the changelog-producer is input, the storage trusts input data is saved as a complete changelog so that ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #248:
URL: https://github.com/apache/flink-table-store/pull/248#discussion_r931133285


##########
docs/content/docs/development/streaming-query.md:
##########
@@ -75,14 +75,14 @@ streaming mode. This mode has a lower cost compared to Kafka but has a higher la
 depending on the checkpoint interval of the writing job.
 
 By default, the downstream streaming consumption is disordered (ordered within the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove downstream

Review Comment:
   How about
   ![image](https://user-images.githubusercontent.com/55568005/181276352-763323b7-02ac-48a8-ae96-b8380bddf3e1.png)
   ![image](https://user-images.githubusercontent.com/55568005/181276436-a819a7b3-d939-4190-9470-1499b8c14b51.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #248:
URL: https://github.com/apache/flink-table-store/pull/248#discussion_r931091615


##########
docs/content/docs/development/streaming-query.md:
##########
@@ -75,14 +75,14 @@ streaming mode. This mode has a lower cost compared to Kafka but has a higher la
 depending on the checkpoint interval of the writing job.
 
 By default, the downstream streaming consumption is disordered (ordered within the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove downstream

Review Comment:
   I feel it is a little hard to understand from a user's perspective. A more intuitive way is to add an example and some pic(s) to illustrate.
   
   For example, we can take `ChangelogWithKeyFileStoreTableTest#testStreamingChangelog` to explain the final result difference between enable/disable `ChangelogProducer.INPUT`
   
   And meanwhile, add some pic to illustrate the processing difference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] LadyForest commented on a diff in pull request #248: [FLINK-28712] Default Changelog all when changelog producer is input

Posted by GitBox <gi...@apache.org>.
LadyForest commented on code in PR #248:
URL: https://github.com/apache/flink-table-store/pull/248#discussion_r931133285


##########
docs/content/docs/development/streaming-query.md:
##########
@@ -75,14 +75,14 @@ streaming mode. This mode has a lower cost compared to Kafka but has a higher la
 depending on the checkpoint interval of the writing job.
 
 By default, the downstream streaming consumption is disordered (ordered within the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove downstream

Review Comment:
   How about
   ![image](https://user-images.githubusercontent.com/55568005/181272707-e1b06183-2c6b-454a-bedf-aee9e1504de9.png)
   
   ![image](https://user-images.githubusercontent.com/55568005/181272515-c579095d-6e8e-42fb-b4f5-6d13aaa10cd7.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org