You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/31 03:20:56 UTC

[GitHub] [inlong] thesumery commented on a diff in pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

thesumery commented on code in PR #6302:
URL: https://github.com/apache/inlong/pull/6302#discussion_r1008999901


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -47,13 +58,17 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 
 /**
  * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat}
  * It is used in the multiple sink scenario, in this scenario, we directly convert the data format by
  * 'sink.multiple.format' in the data stream to doris json that is used to load
  */
-public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
+public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T>

Review Comment:
   Here impelement CheckpointedFunction is useless.
   Because  Doris sink DataStream is produced by `OutputFormatProvider.of`, but here OutputFormatSinkFunction does not implement CheckpointedFunction, so DorisDynamicSchemaOutputFormat#notifyCheckpoint and DorisDynamicSchemaOutputFormat#initializeState will never be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org