You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "yunqingmoswu (via GitHub)" <gi...@apache.org> on 2023/01/31 06:29:17 UTC

[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #7148: [INLONG-7146][Sort] Adjust dirty data multiple sink archive strategy

yunqingmoswu commented on code in PR #7148:
URL: https://github.com/apache/inlong/pull/7148#discussion_r1091499008


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java:
##########
@@ -110,22 +114,59 @@ public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
             }
             throw ex;
         }
+
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        final String SEPARATOR = "%#%#%#";
+        JsonNode rootNode = null;
+        List<String> actualIdentifier = new ArrayList<>();
+
+        try {
+            // for rowdata where identifier is not the first element, append identifier and SEPARATOR before it.
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String rawIdentifier = ((String) dirtyData).split(SEPARATOR)[0];

Review Comment:
   why do it like this?



##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java:
##########
@@ -130,8 +131,16 @@ public synchronized void invoke(DirtyData<T> dirtyData) throws Exception {
     }
 
     private boolean valid() {
-        return (s3Options.getBatchSize() > 0 && size >= s3Options.getBatchSize())
-                || batchBytes >= s3Options.getMaxBatchBytes();
+        // stash dirty data for at least a minute to avoid flushing too fast

Review Comment:
   In the current archiving logic, archiving can be done according to time interval or throughput, but the processing here is redundant and inelegant



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -512,27 +524,58 @@ private void handleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e)
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+
         metricData.invokeDirty(1, dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
     }
 
     private void handleMultipleDirtyData(Object dirtyData, DirtyType dirtyType, Exception e) {
-        JsonNode rootNode;
+        JsonNode rootNode = null;
+        String database = null;
+        String table = null;
         try {
-            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            if (dirtyData instanceof RowData) {
+                rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0));
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+            } else if (dirtyData instanceof String) {
+                // parse and remove the added identifier for string cases
+                String[] arr = ((String) dirtyData).split("\\.");

Review Comment:
   why do. it like this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org