You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2023/01/04 11:09:10 UTC

[GitHub] [inlong] Yizhou-Yang opened a new pull request, #7148: [INLONG-7146] [Sort] adjust dirty data doris dynamic error handling strategy

Yizhou-Yang opened a new pull request, #7148:
URL: https://github.com/apache/inlong/pull/7148

   ### Prepare a Pull Request
   
   - Fixes [[Improve] [Sort] adjust dirty data doris dynamic error handling strategy #7146
   ] (https://github.com/apache/inlong/issues/7082)
   
   ### Motivation
   
   the original doris dirty data catching requires additional fine-tuning on error catching logic.
   This pr is very dangerous sink it is likely to cause problems in runtime, but it is necessary.
   needs more fine-tuning, should not be merged in at least a week.
   
   ### Modifications
   1) dirty data&helper: manually format the dirty data to avoid formatting problems caused by the competition for  the ${} regex patternreplaceutils and regex parsing for multiple sink.
   
   2) doris dynamic sink: adjusted respcontent to make sure that the dirty data can be archived correctly.
   
   3) deleted many log.error calls since if the data is archived as dirty, the user need not to receive relevant exceptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1092681823


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java:
##########
@@ -110,22 +114,59 @@ public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
             }
             throw ex;
         }
+
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        final String SEPARATOR = "%#%#%#";
+        JsonNode rootNode = null;
+        List<String> actualIdentifier = new ArrayList<>();
+
+        try {
+            // for rowdata where identifier is not the first element, append identifier and SEPARATOR before it.
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String rawIdentifier = ((String) dirtyData).split(SEPARATOR)[0];

Review Comment:
   for rowdata in multiple sink, the dirty sink needs to know the identifier. So I passed in "identifier + DIRTY_SEPARATOR + rowData" to be parsed for those that misses the identifier. The multiple sink scenario all lacks the identifier, so this identifier is added for every single call of string to this method. This one only has db/table, so it is easier to process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1092680812


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -512,27 +524,58 @@ private void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e)
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+
         metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
     }
 
     private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
-        JsonNode rootNode;
+        JsonNode rootNode = null;
+        String database = null;
+        String table = null;
         try {
-            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String[] arr = ((String) dirtyData).split("\\.");

Review Comment:
   for rowdata in multiple sink, the dirty sink needs to know the identifier. So I passed in  "identifier + DIRTY_SEPARATOR + rowData" to be parsed. The multiple sink scenario all lacks the identifier, so this identifier is added for every single call of string to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1092682121


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -130,8 +131,16 @@ public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
     }
 
     private boolean valid() {
-        return (s3Options.getBatchSize() > 0 && size >= s3Options.getBatchSize())
-                || batchBytes >= s3Options.getMaxBatchBytes();
+        // stash dirty data for at least a minute to avoid flushing too fast

Review Comment:
   OK, then don't merge this part for now... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang closed pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang closed pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy
URL: https://github.com/apache/inlong/pull/7148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on a diff in pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1092680812


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -512,27 +524,58 @@ private void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e)
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+
         metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
     }
 
     private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
-        JsonNode rootNode;
+        JsonNode rootNode = null;
+        String database = null;
+        String table = null;
         try {
-            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String[] arr = ((String) dirtyData).split("\\.");

Review Comment:
   for rowdata in multiple sink, the dirty sink needs to know the identifier. So I passed in  "identifier + DIRTY_SEPARATOR + rowData" to be parsed for those that misses the identifier. The multiple sink scenario all lacks the identifier, so this identifier is added for every single call of string to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on PR #7148:
URL: https://github.com/apache/inlong/pull/7148#issuecomment-1413385236

   this pr is refactored and then merged 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] Yizhou-Yang commented on pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "Yizhou-Yang (via GitHub)" <gi...@apache.org>.
Yizhou-Yang commented on PR #7148:
URL: https://github.com/apache/inlong/pull/7148#issuecomment-1407280878

   this should be seperated into 3 different bugfix patches and should be further optimized.
   will close this pr later and make it into 3 different prs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

Posted by "yunqingmoswu (via GitHub)" <gi...@apache.org>.
yunqingmoswu commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1091499008


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java:
##########
@@ -110,22 +114,59 @@ public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
             }
             throw ex;
         }
+
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        final String SEPARATOR = "%#%#%#";
+        JsonNode rootNode = null;
+        List<String> actualIdentifier = new ArrayList<>();
+
+        try {
+            // for rowdata where identifier is not the first element, append identifier and SEPARATOR before it.
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String rawIdentifier = ((String) dirtyData).split(SEPARATOR)[0];

Review Comment:
   why do it like this?



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -130,8 +131,16 @@ public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
     }
 
     private boolean valid() {
-        return (s3Options.getBatchSize() > 0 && size >= s3Options.getBatchSize())
-                || batchBytes >= s3Options.getMaxBatchBytes();
+        // stash dirty data for at least a minute to avoid flushing too fast

Review Comment:
   In the current archiving logic, archiving can be done according to time interval or throughput, but the processing here is redundant and inelegant



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -512,27 +524,58 @@ private void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e)
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+
         metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
     }
 
     private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
-        JsonNode rootNode;
+        JsonNode rootNode = null;
+        String database = null;
+        String table = null;
         try {
-            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String[] arr = ((String) dirtyData).split("\\.");

Review Comment:
   why do. it like this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org