You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/11/21 02:25:37 UTC

[GitHub] [inlong] e-mhui opened a new pull request, #6580: [INLONG-6578][Sort] Fix the problem of write failure after Doris sets the 'sink. properties. columns' parameter

e-mhui opened a new pull request, #6580:
URL: https://github.com/apache/inlong/pull/6580

   ### Fix the problem of write failure after Doris sets the 'sink. properties. columns' parameter
   
   - Fixes #6578
   
   ### Motivation
   
   To write bitmap data to doris, we need to set the `sink.properties.columns` option.
   
   ```sql
   CREATE TABLE `table_2`(
       `dt` INT,
       `page` STRING,
       `user_id` INT)
       WITH (
       'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=2',
       'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)',
       'connector' = 'doris-inlong',
       'fenodes' = 'xxxx:8030',
       'username' = 'root',
       'password' = 'xxx',
       'sink.multiple.enable' = 'false',
       'table.identifier' = 'db.table'
   )
   ```
   
   But, when writing data to doris, it will add a hidden column called `__ DORIS_ DELETE_ SIGN__`. More details refers [batch-delete-manual](https://doris.apache.org/docs/data-operate/update-delete/batch-delete-manual/). The source code is as follows:
   
   ```java
   // add doris delete sign
   if (enableBatchDelete()) {
       if (jsonFormat) {
           valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(rowData.getRowKind()));
       } else {
           value.add(parseDeleteSign(rowData.getRowKind()));
       }
   }
   ```
   
   So, we need to add a column called `__ DORIS_ DELETE_ SIGN__` at the end of parameter, it identifies that the newly added column is a`__ DORIS_ DELETE_ SIGN__` field. such as: 
   
   ```sql
   'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id), __ DORIS_ DELETE_ SIGN__'
   ```
   
   ### Modifications
   
   如果用户设置了`sink.properties.columns`参数,判断是否批量删除,如果可以,就在column参数中增加一个隐藏列`__ DORIS_ DELETE_ SIGN__`。代码如下:
   
   If the user has set `sink.properties.columns` option. We should determine whether to enable batch delete. If it enable batch delete, add a hidden column called `__ DORIS_ DELETE_ SIGN__` to the column parameter.  The code is as follows:
   
   ```java
   Properties loadProperties = executionOptions.getStreamLoadProp();
   // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
   String columns = (String) loadProperties.get(COLUMNS_KEY);
   if (loadProperties.containsKey(COLUMNS_KEY) && !columns.contains(DORIS_DELETE_SIGN)
           && enableBatchDelete()) {
       loadProperties.put(COLUMNS_KEY, String.format("%s,%s", columns, DORIS_DELETE_SIGN));
   }
   
   private boolean enableBatchDelete() {
       try {
           Schema schema = RestService.getSchema(options, readOptions, LOG);
           return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
       } catch (DorisException e) {
           throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] e-mhui commented on a diff in pull request #6580: [INLONG-6578][Sort] Fix the write failure after the Doris sets 'sink.properties.columns' option

Posted by GitBox <gi...@apache.org>.
e-mhui commented on code in PR #6580:
URL: https://github.com/apache/inlong/pull/6580#discussion_r1027555213


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java:
##########
@@ -125,5 +143,14 @@ public DynamicTableSink copy() {
     public String asSummaryString() {
         return "Doris Table Sink Of InLong";
     }
+
+    private boolean enableBatchDelete() {
+        try {
+            Schema schema = RestService.getSchema(options, readOptions, LOG);
+            return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
+        }

Review Comment:
   done.



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java:
##########
@@ -47,6 +55,9 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final Integer parallelism;
+    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
+    public static final String COLUMNS_KEY = "columns";
+    public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #6580: [INLONG-6578][Sort] Fix the write failure after the Doris sets 'sink.properties.columns' option

Posted by GitBox <gi...@apache.org>.
gong commented on code in PR #6580:
URL: https://github.com/apache/inlong/pull/6580#discussion_r1027502591


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java:
##########
@@ -125,5 +143,14 @@ public DynamicTableSink copy() {
     public String asSummaryString() {
         return "Doris Table Sink Of InLong";
     }
+
+    private boolean enableBatchDelete() {
+        try {
+            Schema schema = RestService.getSchema(options, readOptions, LOG);
+            return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
+        }

Review Comment:
   I suggest print original `DorisException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6580: [INLONG-6578][Sort] Fix the write failure after the Doris sets 'sink.properties.columns' option

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6580:
URL: https://github.com/apache/inlong/pull/6580#discussion_r1027499535


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java:
##########
@@ -47,6 +55,9 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final Integer parallelism;
+    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
+    public static final String COLUMNS_KEY = "columns";
+    public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";

Review Comment:
   Do these two constants need to be set to `public`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #6580: [INLONG-6578][Sort] Fix the write failure after the Doris sets 'sink.properties.columns' option

Posted by GitBox <gi...@apache.org>.
dockerzhang merged PR #6580:
URL: https://github.com/apache/inlong/pull/6580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org