You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/26 07:43:33 UTC

[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2888: [Core][Metrics] Add Seatunnel Metrics module

Hisoka-X commented on code in PR #2888:
URL: https://github.com/apache/incubator-seatunnel/pull/2888#discussion_r979647707


##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java:
##########
@@ -322,4 +326,61 @@ private void setCheckpoint() {
         }
     }
 
+    public StreamExecutionEnvironment creatMetricStreamEEnvironment() {
+
+        if(!config.hasPath(ConfigKeyName.Metric_Class)){
+            return StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+        //构建flink-metrics参数

Review Comment:
   Use english only



##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java:
##########
@@ -45,4 +45,9 @@ private ConfigKeyName() {
     public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
     public static final String STATE_BACKEND = "execution.state.backend";
     public static final String PLANNER = "execution.planner";
+    public static final String Metric_Interval = "execution.metrics.interval";

Review Comment:
   Use unified name in spark/flink. There config name is for `SeaTunnel`, not only for `Flink` or `Spark`.



##########
seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java:
##########
@@ -178,6 +179,22 @@ public static <T extends Object> T sinkProcess(SparkEnvironment environment, Bas
         }
         return sink.output(fromDs, environment);
     }
+
+    private SparkSession.Builder creatMetricBuilder(SparkSession.Builder builder){
+        if(config.hasPath("spark.metrics.class")){

Review Comment:
   Same problem, you should use  unified name 



##########
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf:
##########
@@ -23,6 +23,11 @@ env {
   execution.parallelism = 1
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+  execution.metrics.class = org.apache.seatunnel.metrics.core.reporter.PrometheusPushGatewayReporter

Review Comment:
   Not add this in default config, because other developer environment don't have Prometheus.



##########
seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/SeatunnelMetricReporter.java:
##########
@@ -0,0 +1,104 @@
+package org.apache.seatunnel.metrics.flink;
+
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.seatunnel.metrics.core.*;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+import org.apache.seatunnel.metrics.core.reporter.PrometheusPushGatewayReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * exports Flink metrics to Seatunnel
+ */
+public class SeatunnelMetricReporter extends AbstractSeatunnelReporter implements Scheduled {
+    private static final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class);

Review Comment:
   ```suggestion
       private static final Logger LOGGER = LoggerFactory.getLogger(SeatunnelMetricReporter.class);
   ```



##########
seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf:
##########
@@ -27,6 +27,10 @@ env {
   spark.executor.cores = 1
   spark.executor.memory = "1g"
   spark.master = local
+  spark.metrics.class = org.apache.seatunnel.metrics.core.reporter.PrometheusPushGatewayReporter

Review Comment:
   Same problems. You can use comment to show this config.



##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java:
##########
@@ -322,4 +326,61 @@ private void setCheckpoint() {
         }
     }
 
+    public StreamExecutionEnvironment creatMetricStreamEEnvironment() {
+
+        if(!config.hasPath(ConfigKeyName.Metric_Class)){
+            return StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+        //构建flink-metrics参数
+        ConfigOption<String> REPORTERS_LIST =

Review Comment:
   Maybe you can create an new method called `initMetricConfig(Configuration config)` to cover this logic. 



##########
seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/reporter/PrometheusPushGatewayReporter.java:
##########
@@ -0,0 +1,238 @@
+package org.apache.seatunnel.metrics.core.reporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.PushGateway;
+import org.apache.seatunnel.metrics.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A reporter which outputs measurements to PrometheusPushGateway
+ */
+public class PrometheusPushGatewayReporter implements MetricReporter {

Review Comment:
   Move to `seatunnel-metrics-prometheus`



##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java:
##########
@@ -322,4 +326,61 @@ private void setCheckpoint() {
         }
     }
 
+    public StreamExecutionEnvironment creatMetricStreamEEnvironment() {

Review Comment:
   ```suggestion
       public StreamExecutionEnvironment creatStreamEnvironment() {
   ```
   Not only have metrics config will run this method.



##########
seatunnel-metrics/seatunnel-metrics-flink/src/main/java/org/apache/seatunnel/metrics/flink/SeatunnelMetricReporter.java:
##########
@@ -0,0 +1,104 @@
+package org.apache.seatunnel.metrics.flink;
+
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.seatunnel.metrics.core.*;
+import org.apache.seatunnel.metrics.core.reporter.MetricReporter;
+import org.apache.seatunnel.metrics.core.reporter.PrometheusPushGatewayReporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * exports Flink metrics to Seatunnel
+ */
+public class SeatunnelMetricReporter extends AbstractSeatunnelReporter implements Scheduled {
+    private static final Logger log = LoggerFactory.getLogger(SeatunnelMetricReporter.class);
+    private MetricReporter reporter;
+    private String host;
+    private int port;
+    private String jobName;
+
+    @Override
+    public void open(MetricConfig metricConfig) {
+        MetricConfig config = metricConfig;
+        config.isEmpty();
+        host = config.getString("host","localhost");
+        port = config.getInteger("port",9091);
+        jobName = config.getString("jobName","flinkJob");
+        //config.
+        //String string = metricConfig.getString("name", "de");
+        //log.info("StreamMetricReporter init:{}", string);

Review Comment:
   Remove useless code.



##########
seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java:
##########
@@ -322,4 +326,61 @@ private void setCheckpoint() {
         }
     }
 
+    public StreamExecutionEnvironment creatMetricStreamEEnvironment() {
+
+        if(!config.hasPath(ConfigKeyName.Metric_Class)){
+            return StreamExecutionEnvironment.getExecutionEnvironment();
+        }
+        //构建flink-metrics参数
+        ConfigOption<String> REPORTERS_LIST =
+                key("metrics.reporters")
+                        .stringType()
+                        .noDefaultValue();
+
+        ConfigOption<String> REPORTER_CLASS =
+                key("metrics.reporter.seatunnel_reporter.class")
+                        .stringType()
+                        .noDefaultValue();
+        ConfigOption<Duration> REPORTER_INTERVAL =
+                key("metrics.reporter.seatunnel_reporter.interval")
+                        .durationType()
+                        .defaultValue(Duration.ofSeconds(10));
+
+        ConfigOption<String> REPORTER_CONFIG_PORT =
+                key("metrics.reporter.seatunnel_reporter.port")
+                        .stringType()
+                        .noDefaultValue();
+        ConfigOption<String> REPORTER_CONFIG_HOST =
+                key("metrics.reporter.seatunnel_reporter.host")
+                        .stringType()
+                        .noDefaultValue();
+        ConfigOption<String> REPORTER_CONFIG_JOB_NAME =
+                key("metrics.reporter.seatunnel_reporter.jobName")
+                        .stringType()
+                        .noDefaultValue();
+
+        Configuration seatunnel_reporter = new Configuration().set(REPORTERS_LIST, "seatunnel_reporter").set(REPORTER_CLASS, "org.apache.seatunnel.metrics.flink.SeatunnelMetricReporter");

Review Comment:
   use `lower camel case` for field and parameter. Change `seatunnel_reporter` to `seatunnelReporter`



##########
seatunnel-metrics/seatunnel-metrics-core/pom.xml:
##########
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-metrics</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-metrics-core</artifactId>
+
+    <properties>
+        <prometheus.version>0.9.0</prometheus.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.prometheus</groupId>

Review Comment:
   Core can't contains prometheus, your should create new module call `seatunnel-metrics-prometheus`, then implement your metrics push to prometheus logic in there. The core should only have interface for metrics and common code.



##########
seatunnel-metrics/seatunnel-metrics-core/src/main/java/org/apache/seatunnel/metrics/core/MetricInfo.java:
##########
@@ -0,0 +1,52 @@
+package org.apache.seatunnel.metrics.core;
+
+import java.util.List;
+
+/** Stores all child-properties of a metric. */
+public class MetricInfo {
+    private String metricName;
+    private List<String> dimensionKeys;
+    private List<String> dimensionValues;
+    private String helpString;
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public String getHelpString() {
+        return helpString;
+    }
+
+    public List<String> getDimensionKeys() {
+        return dimensionKeys;
+    }
+
+    public List<String> getDimensionValues() {
+        return dimensionValues;
+    }
+    public MetricInfo(String metricName, String helpString, List<String> dimensionKeys, List<String> dimensionValues) {
+        this.metricName = metricName;
+        this.helpString = helpString;
+        this.dimensionKeys = dimensionKeys;
+        this.dimensionValues = dimensionValues;
+    }
+
+    @Override
+    public String toString() {
+        String lineSeparator = System.lineSeparator();
+        StringBuilder builder = new StringBuilder();
+        builder.append("metricName: ")
+                .append(this.metricName)
+                .append(lineSeparator);
+        builder.append("helpString: ")
+                .append(this.helpString)
+                .append(lineSeparator);
+        for(int i=0;i<this.dimensionKeys.size();i++){

Review Comment:
   Code need be formated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org