You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/26 14:16:30 UTC

[GitHub] [inlong] kuansix opened a new pull request, #6302: [INLONG-6301] doris connect add metric

kuansix opened a new pull request, #6302:
URL: https://github.com/apache/inlong/pull/6302

   ### Prepare a Pull Request
   
   
   - Fixes #6301 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
gong commented on PR #6302:
URL: https://github.com/apache/inlong/pull/6302#issuecomment-1294509390

   Please refer to jdbc connector, `CheckpointFuntion` just be used with funtion. And here getRuntimeContext will throw null exception, because outputFormat can't get runtimeContext.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
healchow commented on PR #6302:
URL: https://github.com/apache/inlong/pull/6302#issuecomment-1298004045

   Duplicated with #6346.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] kuansix commented on pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
kuansix commented on PR #6302:
URL: https://github.com/apache/inlong/pull/6302#issuecomment-1292891566

   来信已收到,谢谢!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6302:
URL: https://github.com/apache/inlong/pull/6302#discussion_r1008999901


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -47,13 +58,17 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 
 /**
  * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat}
  * It is used in the multiple sink scenario, in this scenario, we directly convert the data format by
  * 'sink.multiple.format' in the data stream to doris json that is used to load
  */
-public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
+public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T>

Review Comment:
   Here impelement CheckpointedFunction is useless.
   Because  Doris sink DataStream is produced by `OutputFormatProvider.of`, but here OutputFormatSinkFunction does not implement CheckpointedFunction, so DorisDynamicSchemaOutputFormat#notifyCheckpoint and DorisDynamicSchemaOutputFormat#initializeState will never be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow closed pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
healchow closed pull request #6302: [INLONG-6301][Sort] Doris connector add metrics
URL: https://github.com/apache/inlong/pull/6302


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6302:
URL: https://github.com/apache/inlong/pull/6302#discussion_r1006945561


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.doris.table;
 
+import com.google.gson.Gson;

Review Comment:
   Did this dependency need?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #6302:
URL: https://github.com/apache/inlong/pull/6302#discussion_r1006349511


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -298,6 +333,37 @@ private String getBackend() throws IOException {
         }
     }
 
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        Gson gson = new Gson();

Review Comment:
   anyway to reuse this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6302: [INLONG-6301][Sort] Doris connector add metrics

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6302:
URL: https://github.com/apache/inlong/pull/6302#discussion_r1006350518


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java:
##########
@@ -92,14 +96,16 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
                 .setExecutionOptions(executionOptions)
                 .setDatabasePattern(databasePattern)
                 .setTablePattern(tablePattern)
-                .setDynamicSchemaFormat(sinkMultipleFormat);
+                .setDynamicSchemaFormat(sinkMultipleFormat)

Review Comment:
   The metrics is also required for the normal sink.



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -124,6 +149,11 @@ public void open(int taskNumber, int numTasks) throws IOException {
                 executionOptions.getStreamLoadProp());
         jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
                 DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
+        this.numPendingRequests = new AtomicLong(0);

Review Comment:
   Maybe it is not necessary here?



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -259,10 +290,14 @@ public synchronized void flush() throws IOException {
         for (Entry<String, List> kvs : batchMap.entrySet()) {
             load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
         }
+        numPendingRequests.set(0);
     }
 
     private void load(String tableIdentifier, String result) throws IOException {
         String[] tableWithDb = tableIdentifier.split("\\.");
+        if (metricData != null) {
+            metricData.invokeWithEstimate(result);
+        }

Review Comment:
   The dirty metrics statitic is also required?



##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -298,6 +333,37 @@ private String getBackend() throws IOException {
         }
     }
 
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        Gson gson = new Gson();
+        LOG.info("snapshotState begin, context is:{}", gson.toJson(functionSnapshotContext));
+        while (numPendingRequests.get() != 0) {
+            flush();
+        }
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+        LOG.info("snapshotState end, context is:{}", gson.toJson(functionSnapshotContext));
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        Gson gson = new Gson();
+        LOG.info("initializeState begin, context is:{}", gson.toJson(context));

Review Comment:
   It is recommend use the json handle mode that has exists not the 'Gson' and also it is not necessary for log here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org