You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/04 06:21:01 UTC

[incubator-inlong] branch master updated: [INLONG-2805][DataProxy] Add stream config log report (#2806)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dd6f6  [INLONG-2805][DataProxy] Add stream config log report (#2806)
39dd6f6 is described below

commit 39dd6f60cd1a80b541cf17f7da2604dbb8079324
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Mar 4 14:20:55 2022 +0800

    [INLONG-2805][DataProxy] Add stream config log report (#2806)
---
 .../agent/plugin/sources/reader/BinlogReader.java  |  39 +++-
 .../inlong/common/reporpter/AbstractReporter.java  |  57 ++----
 ...treamMetricInfo.java => ConfigLogTypeEnum.java} |  43 ++---
 .../reporpter/{ResponseType.java => Response.java} |  11 +-
 .../common/reporpter/StreamConfigLogMetric.java    | 134 ++++++++++++++
 .../common/reporpter/StreamConfigLogReporter.java  |   2 +-
 .../common/reporpter/StreamMetricReporter.java     |  40 ----
 .../common/reporpter/dto/StreamConfigLogInfo.java  |   3 +-
 .../common/metric/reporter/ReporterTest.java       |  17 +-
 inlong-dataproxy/conf/common.properties            |   7 +-
 .../inlong/dataproxy/consts/ConfigConstants.java   |   3 +
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |  29 ++-
 .../dataproxy/sink/pulsar/PulsarClientService.java |  92 +++++++--
 ...java => InlongStreamConfigLogListResponse.java} |   4 +-
 ....java => InlongStreamConfigLogPageRequest.java} |  15 +-
 .../pojo/stream/InlongStreamConfigLogRequest.java  |   3 +-
 .../manager/dao/entity/StreamConfigLogEntity.java  |  14 +-
 .../manager/dao/entity/StreamMetricEntity.java     | 197 --------------------
 .../dao/mapper/StreamConfigLogEntityMapper.java    |  16 +-
 .../dao/mapper/StreamMetricEntityMapper.java       |  39 ----
 .../src/main/resources/generatorConfig.xml         |   8 -
 .../mappers/StreamConfigLogEntityMapper.xml        | 206 ++++-----------------
 .../resources/mappers/StreamMetricEntityMapper.xml | 204 --------------------
 .../service/core/StreamConfigLogService.java       |   6 +
 .../manager/service/core/StreamMetricService.java  |  26 ---
 .../core/impl/StreamConfigLogServiceImpl.java      |  38 +++-
 .../service/core/impl/StreamMetricServiceImpl.java |  67 -------
 .../manager-web/sql/apache_inlong_manager.sql      |  38 +---
 ...ller.java => StreamConfigLogWebController.java} |  29 +--
 .../openapi/StreamConfigLogController.java         |   6 +-
 30 files changed, 455 insertions(+), 938 deletions(-)

diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 1f25c78..052695b 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -34,11 +34,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
 import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
+import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 
@@ -48,6 +52,7 @@ public class BinlogReader implements Reader {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
 
+    private static final String COMPONENT_NAME = "BinlogReader";
     private static final String JOB_DATABASE_USER = "job.binlogJob.user";
     private static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
     private static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
@@ -94,6 +99,12 @@ public class BinlogReader implements Reader {
     private JobProfile jobProfile;
     private static final Gson gson = new Gson();
 
+    private boolean enableReportConfigLog;
+    private StreamConfigLogMetric streamConfigLogMetric;
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+
     public BinlogReader() {
     }
 
@@ -140,6 +151,27 @@ public class BinlogReader implements Reader {
         binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
         binlogSnapshot.save(offset);
 
+        enableReportConfigLog =
+                Boolean.parseBoolean(jobConf.get(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,
+                "true"));
+
+        inlongGroupId = jobConf.get(CommonConstants.PROXY_INLONG_GROUP_ID,
+                CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID);
+        inlongStreamId = jobConf.get(CommonConstants.PROXY_INLONG_STREAM_ID,
+                CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID);
+
+        if (enableReportConfigLog) {
+            String reportConfigServerUrl = jobConf
+                    .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, "");
+            String reportConfigLogInterval = jobConf
+                    .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
+            String clientVersion = jobConf
+                    .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
+            streamConfigLogMetric = new StreamConfigLogMetric(COMPONENT_NAME,
+                    reportConfigServerUrl, Long.parseLong(reportConfigLogInterval),
+                    AgentUtils.getLocalIp(), clientVersion);
+        }
+
         Properties props = getEngineProps();
 
         DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
@@ -162,8 +194,11 @@ public class BinlogReader implements Reader {
             })
             .using((success, message, error) -> {
                 if (!success) {
-                    LOGGER.error("binlog job with jobConf {} has "
-                        + "error {}", jobConf.getInstanceId(), message, error);
+                    LOGGER.error("binlog job with jobConf {} has " + "error {}",
+                            jobConf.getInstanceId(), message, error);
+                    streamConfigLogMetric
+                            .updateConfigLog(inlongGroupId, inlongStreamId, "DBConfig",
+                                    ConfigLogTypeEnum.ERROR, error == null ? "" : error.toString());
                 }
             }).build();
 
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
index e831675..6324eaa 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.common.reporpter;
 
 import com.google.gson.Gson;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -38,7 +36,7 @@ import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AbstractReporter<T, R> {
+public class AbstractReporter<T> {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(AbstractReporter.class);
 
@@ -54,7 +52,7 @@ public class AbstractReporter<T, R> {
 
     private static CloseableHttpClient httpClient;
 
-    private final Class<?> clazz = ResponseType.class;
+    private final Class<?> clazz = Response.class;
 
     private ThreadPoolExecutor pool;
 
@@ -96,25 +94,26 @@ public class AbstractReporter<T, R> {
         this.httpClient = httpClient;
     }
 
-    public R syncReportData(T data, String serverUrl) throws Exception {
+    public Response syncReportData(T data, String serverUrl) throws Exception {
+        if (StringUtils.isEmpty(serverUrl)) {
+            LOGGER.warn("Report config log server url is empty, so config log can not be "
+                    + "reported!");
+            return null;
+        }
         HttpPost httpPost = new HttpPost(serverUrl);
         try {
             StringEntity stringEntity = new StringEntity(gson.toJson(data));
             stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
             httpPost.setEntity(stringEntity);
             String returnStr = executeHttpPost(httpPost);
-            ResponseType<R> re = parse(returnStr);
-            if (re != null) {
-                return re.getResponse();
-            }
+            return parse(returnStr);
         } catch (Exception e) {
             LOGGER.error("syncReportData has exception e = {}", e);
             throw e;
         }
-        return null;
     }
 
-    public R syncReportData(T data) throws Exception {
+    public Response syncReportData(T data) throws Exception {
         return this.syncReportData(data, serverUrl);
     }
 
@@ -126,8 +125,8 @@ public class AbstractReporter<T, R> {
         return EntityUtils.toString(response.getEntity());
     }
 
-    public Future<R>  asyncReportData(T data, String serverUrl) {
-        CompletableFuture<R> completableFuture = new CompletableFuture<>();
+    public Future<Response>  asyncReportData(T data, String serverUrl) {
+        CompletableFuture<Response> completableFuture = new CompletableFuture<>();
 
         if (pool != null) {
             pool.execute(new RunTask(completableFuture, data, serverUrl));
@@ -138,50 +137,28 @@ public class AbstractReporter<T, R> {
         return completableFuture;
     }
 
-    public Future<R> asyncReportData(T data) {
+    public Future<Response> asyncReportData(T data) {
         return asyncReportData(data, serverUrl);
     }
 
-    public ResponseType<R> parse(String json) throws Exception {
+    public Response parse(String json) throws Exception {
 
         if (StringUtils.isEmpty(json)) {
             return null;
         }
 
-        ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
-
-        Type objectType = buildType(clazz, type.getActualTypeArguments());
-
-        return gson.fromJson(json, objectType);
-    }
-
-    private ParameterizedType buildType(final Class<?> raw, final Type... args) {
-
-        return new ParameterizedType() {
-
-            public Type getRawType() {
-                return raw;
-            }
-
-            public Type[] getActualTypeArguments() {
-                return args;
-            }
-
-            public Type getOwnerType() {
-                return null;
-            }
-        };
+        return gson.fromJson(json, Response.class);
     }
 
     class RunTask implements Runnable {
 
-        private CompletableFuture<R> completableFuture;
+        private CompletableFuture<Response> completableFuture;
 
         private T data;
 
         private String url;
 
-        public RunTask(CompletableFuture<R> completableFuture, T data, String url) {
+        public RunTask(CompletableFuture<Response> completableFuture, T data, String url) {
             this.completableFuture = completableFuture;
             this.data = data;
             this.url = url;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
similarity index 56%
rename from inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
rename to inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
index b887d91..d7d654a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
@@ -15,32 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.reporpter.dto;
+package org.apache.inlong.common.reporpter;
 
-import java.util.Date;
-import lombok.Getter;
-import lombok.Setter;
+import static java.util.Objects.requireNonNull;
 
-@Setter
-@Getter
-public class StreamMetricInfo {
+public enum ConfigLogTypeEnum {
 
-    private String ip;
+    NORMAL(0),ERROR(1);
 
-    private String version;
+    private int type;
 
-    private String componentName;
+    ConfigLogTypeEnum(int type) {
+        this.type = type;
+    }
 
-    private String metricName;
-
-    private Integer logType;
-
-    private Date reportTime;
-
-    private String metricInfo;
-
-    private String inlongGroupId;
-
-    private String inlongStreamId;
+    public static ConfigLogTypeEnum getOpType(int opType) {
+        requireNonNull(opType);
+        switch (opType) {
+            case 0:
+                return NORMAL;
+            case 1:
+                return ERROR;
+            default:
+                throw new RuntimeException("config log type doesn't exist");
+        }
+    }
 
+    public int getType() {
+        return type;
+    }
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
similarity index 88%
rename from inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
rename to inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
index f0f1298..4f4d117 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
@@ -22,11 +22,8 @@ import lombok.Setter;
 
 @Setter
 @Getter
-public class ResponseType<T> {
-
-    private T response;
-
-    public T getResponse() {
-        return response;
-    }
+public class Response {
+    private boolean success;
+    private String errMsg;
+    private String data;
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
new file mode 100644
index 0000000..17f2eb2
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Instant;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamConfigLogMetric implements Runnable {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(StreamConfigLogMetric.class);
+
+    /*
+     * config name for log report
+     */
+    public static final String CONFIG_LOG_REPORT_ENABLE = "report.config.log.enable";
+    public static final String CONFIG_LOG_REPORT_SERVER_URL = "report.config.log.server.url";
+    public static final String CONFIG_LOG_REPORT_INTERVAL = "report.config.log.interval";
+    public static final String CONFIG_LOG_REPORT_CLIENT_VERSION = "report.config.log.client.version";
+    public static final String CONFIG_LOG_PULSAR_PRODUCER = "pulsar-producer";
+    public static final String CONFIG_LOG_PULSAR_CLIENT = "pulsar-client";
+
+    private StreamConfigLogReporter streamConfigLogReporter;
+
+    private String moduleName;
+
+    private String clientVersion;
+
+    private String localIp;
+
+    private long reportInterval;
+
+    public ConcurrentHashMap<String, StreamConfigLogInfo> dataCacheMap = new ConcurrentHashMap<>();
+
+    private static ScheduledExecutorService statExecutor =
+            Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
+                    .setNameFormat("StreamConfigLogMetric-Report")
+                    .setUncaughtExceptionHandler((t, e) ->
+                            LOGGER.error(t.getName() + " has an uncaught exception: ", e))
+                    .build());
+
+    public StreamConfigLogMetric(String moduleName, String serverUrl, long reportInterval,
+            String localIp, String clientVersion) {
+        this.streamConfigLogReporter = new StreamConfigLogReporter(serverUrl);
+        this.reportInterval = reportInterval;
+        this.moduleName = moduleName;
+        this.localIp = localIp;
+        this.clientVersion = clientVersion;
+        statExecutor.scheduleWithFixedDelay(this,
+                reportInterval, reportInterval, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * updateConfigLog
+     * @param inlongGroupId  inlongGroupId
+     * @param inlongStreamId inlongStreamId
+     * @param configName configName
+     * @param configLogTypeEnum configLogTypeEnum
+     * @param log log
+     */
+    public void updateConfigLog(String inlongGroupId, String inlongStreamId, String configName,
+            ConfigLogTypeEnum configLogTypeEnum, String log) {
+        String key = moduleName + "-" +  inlongGroupId + "-" + inlongStreamId + "-" + configName;
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("updateConfigLog key = {}", key);
+        }
+        dataCacheMap.compute(key, (k, v) -> {
+            if (v == null) {
+                v = new StreamConfigLogInfo();
+            }
+            updateDataValue(v, inlongGroupId,
+                    inlongStreamId, configName, configLogTypeEnum, log);
+            return v;
+        });
+    }
+
+    private void updateDataValue(StreamConfigLogInfo streamConfigLogInfo,
+            String inlongGroupId, String inlongStreamId, String configName,
+            ConfigLogTypeEnum configLogTypeEnum, String log) {
+        streamConfigLogInfo.setComponentName(moduleName);
+        streamConfigLogInfo.setConfigName(configName);
+        streamConfigLogInfo.setInlongGroupId(inlongGroupId);
+        streamConfigLogInfo.setInlongStreamId(inlongStreamId);
+        streamConfigLogInfo.setIp(localIp);
+        streamConfigLogInfo.setVersion(clientVersion);
+        streamConfigLogInfo.setLogInfo(log);
+        streamConfigLogInfo.setReportTime(Instant.now().toEpochMilli());
+        streamConfigLogInfo.setLogType(configLogTypeEnum.getType());
+    }
+
+    public void run() {
+        try {
+            Set<Entry<String, StreamConfigLogInfo>> set = dataCacheMap.entrySet();
+            long currentTimeMills = Instant.now().toEpochMilli();
+            for (Entry<String, StreamConfigLogInfo> entry : set) {
+                StreamConfigLogInfo streamConfigLogInfo = entry.getValue();
+                if ((currentTimeMills - streamConfigLogInfo.getReportTime()) < reportInterval) {
+                    if (LOGGER.isDebugEnabled()) {
+                        LOGGER.debug("Report metric data config key = {}!", streamConfigLogInfo.getConfigName());
+                    }
+                    streamConfigLogReporter.asyncReportData(streamConfigLogInfo);
+                } else {
+                    dataCacheMap.remove(entry.getKey());
+                    LOGGER.info("Remove expired config key {}", entry.getKey());
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("Report streamConfigLogMetric has exception = {}", e);
+        }
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
index 7e1e077..14009a7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
@@ -21,7 +21,7 @@ import java.util.concurrent.RejectedExecutionHandler;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
 
-public class StreamConfigLogReporter extends AbstractReporter<StreamConfigLogInfo, String> {
+public class StreamConfigLogReporter extends AbstractReporter<StreamConfigLogInfo> {
 
     public StreamConfigLogReporter(String serverUrl) {
         super(serverUrl);
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
deleted file mode 100644
index bd774b8..0000000
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import java.util.concurrent.RejectedExecutionHandler;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
-
-public class StreamMetricReporter extends AbstractReporter<StreamMetricInfo, String> {
-
-    public StreamMetricReporter(String serverUrl) {
-        super(serverUrl);
-    }
-
-    public StreamMetricReporter(CloseableHttpClient httpClient, String serverUrl) {
-        super(httpClient, serverUrl);
-    }
-
-    public StreamMetricReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
-            int syncSendQueueSize,
-            RejectedExecutionHandler rejectedExecutionHandler) {
-        super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
-                rejectedExecutionHandler);
-    }
-}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
index 0457b6c..3ce50e1 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.common.reporpter.dto;
 
-import java.util.Date;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -35,7 +34,7 @@ public class StreamConfigLogInfo {
 
     private Integer logType;
 
-    private Date reportTime;
+    private long reportTime;
 
     private String logInfo;
 
diff --git a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
index a05dd1a..e7fdc4f 100644
--- a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
+++ b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
@@ -20,11 +20,10 @@ package org.apache.inlong.common.metric.reporter;
 import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.Future;
+import org.apache.inlong.common.reporpter.Response;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
-import org.apache.inlong.common.reporpter.StreamMetricReporter;
 import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -39,19 +38,7 @@ public class ReporterTest {
                 serverUrl);
         StreamConfigLogReporter spy = Mockito.spy(streamConfigLogReporter);
         StreamConfigLogInfo info = new StreamConfigLogInfo();
-        Future<String> future = spy.asyncReportData(info, serverUrl);
-        Assert.assertEquals(future.get(),null);
-    }
-
-    @Test
-    public void streamMetricReporterTest() throws Exception {
-        String serverUrl = "http://127.0.0.1:/8080/openapi/stream/metric/reportMetricStatus";
-        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
-        StreamMetricReporter streamMetricReporter = new StreamMetricReporter(httpClient,
-                serverUrl);
-        StreamMetricReporter spy = Mockito.spy(streamMetricReporter);
-        StreamMetricInfo info = new StreamMetricInfo();
-        Future<String> future = spy.asyncReportData(info, serverUrl);
+        Future<Response> future = spy.asyncReportData(info, serverUrl);
         Assert.assertEquals(future.get(),null);
     }
 }
diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties
index bc18246..873ade9 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -32,4 +32,9 @@ prometheusHttpPort=8080
 # whether to enable audit
 audit.enable=true
 # audit proxy address
-audit.proxys=127.0.0.1:10081
\ No newline at end of file
+audit.proxys=127.0.0.1:10081
+
+# report config log
+report.config.log.enable=true
+report.config.log.server.url=http://127.0.0.1:8083/openapi/stream/log/reportConfigLogStatus
+report.config.log.interval=60000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index d498b9c..e3b6d8e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -19,6 +19,8 @@ package org.apache.inlong.dataproxy.consts;
 
 public class ConfigConstants {
 
+    public static final String COMPONENT_NAME = "Inlong-DataProxy";
+
     public static final String CONFIG_PORT = "port";
 
     public static final String CONFIG_HOST = "host";
@@ -115,4 +117,5 @@ public class ConfigConstants {
     public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
     public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
     public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+
 }
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 965c41e..10f230c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -37,6 +37,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
+import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
@@ -145,10 +146,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
     private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60 * 1000);
     private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60 * 1000);
 
-    private static final String SINK_THREAD_NUM = "thread-num";
     private int eventQueueSize = 10000;
     private int badEventQueueSize = 10000;
-    private int threadNum;
 
     /*
      * send thread pool
@@ -165,6 +164,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
     private MonitorIndexExt monitorIndexExt;
     private SinkCounter sinkCounter;
 
+    private StreamConfigLogMetric streamConfigLogMetric;
+    private String localIp;
+
     /*
      *  metric
      */
@@ -172,6 +174,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
     private DataProxyMetricItemSet metricItemSet;
 
     private ConfigManager configManager;
+    private Map<String, String> commonProperties;
     private Map<String, String> topicProperties;
 
     private Map<String, String> pulsarCluster;
@@ -221,14 +224,32 @@ public class PulsarSink extends AbstractSink implements Configurable,
         topicProperties = configManager.getTopicProperties();
         pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
         pulsarConfig = configManager.getThirdPartyClusterConfig(); //pulsar common config
+        commonProperties = configManager.getCommonProperties();
         pulsarClientService = new PulsarClientService(pulsarConfig);
+        boolean enableReportConfigLog =
+                Boolean.parseBoolean(commonProperties
+                        .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,"true"));
+        localIp = NetworkUtils.getLocalIp();
+        if (enableReportConfigLog) {
+            String reportConfigServerUrl = commonProperties
+                    .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, "");
+            String reportConfigLogInterval = commonProperties
+                    .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
+            String clientVersion = commonProperties
+                    .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
+            streamConfigLogMetric = new StreamConfigLogMetric(ConfigConstants.COMPONENT_NAME,
+                    reportConfigServerUrl, Long.parseLong(reportConfigLogInterval),
+                    localIp, clientVersion);
+            pulsarClientService.setConfigLogMetric(streamConfigLogMetric);
+        }
+
         configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
             @Override
             public void update() {
                 if (pulsarClientService != null) {
                     diffSetPublish(pulsarClientService,
-                            new HashSet<String>(topicProperties.values()),
-                            new HashSet<String>(configManager.getTopicProperties().values()));
+                            new HashSet<>(topicProperties.values()),
+                            new HashSet<>(configManager.getTopicProperties().values()));
                 }
             }
         });
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index bc3dc29..7c518fa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
 import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
+import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -79,6 +81,8 @@ public class PulsarClientService {
     public int pulsarConnectionsPreBroker;
     private String localIp = "127.0.0.1";
 
+    private StreamConfigLogMetric streamConfigLogMetric;
+
     /**
      * PulsarClientService
      *
@@ -108,6 +112,10 @@ public class PulsarClientService {
         localIp = NetworkUtils.getLocalIp();
     }
 
+    public void setConfigLogMetric(StreamConfigLogMetric streamConfigLogMetric) {
+        this.streamConfigLogMetric = streamConfigLogMetric;
+    }
+
     public void initCreateConnection(CreatePulsarClientCallBack callBack) {
         try {
             createConnection(callBack);
@@ -129,8 +137,10 @@ public class PulsarClientService {
     public boolean sendMessage(String topic, Event event,
                                SendMessageCallBack sendMessageCallBack, EventStat es) {
         TopicProducerInfo producer = null;
+        final String inlongStreamId = getInlongStreamId(event);
+        final String inlongGroupId = getInlongGroupId(event);
         try {
-            producer = getProducer(topic);
+            producer = getProducer(topic, inlongGroupId, inlongStreamId);
         } catch (Exception e) {
             if (logPrinterA.shouldPrint()) {
                 /*
@@ -140,6 +150,11 @@ public class PulsarClientService {
                  */
                 logger.error("Get producer failed!", e);
             }
+            if (streamConfigLogMetric != null) {
+                streamConfigLogMetric.updateConfigLog(inlongGroupId,
+                        inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+                        ConfigLogTypeEnum.ERROR, e.toString());
+            }
         }
         /*
          * If the producer is a null value,\ it means that the topic is not yet
@@ -159,13 +174,8 @@ public class PulsarClientService {
 
         Map<String, String> proMap = new HashMap<>();
         proMap.put("data_proxy_ip", localIp);
-        String streamId = "";
-        if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
-            streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
-        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
-            streamId = event.getHeaders().get(AttributeConstants.INAME);
-        }
-        proMap.put(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+        proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+
         TopicProducerInfo forCallBackP = producer;
         forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
                 .sendAsync().thenAccept((msgId) -> {
@@ -173,6 +183,11 @@ public class PulsarClientService {
             forCallBackP.setCanUseSend(true);
             sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
         }).exceptionally((e) -> {
+            if (streamConfigLogMetric != null) {
+                streamConfigLogMetric.updateConfigLog(inlongGroupId,
+                        inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+                        ConfigLogTypeEnum.ERROR, e.toString());
+            }
             forCallBackP.setCanUseSend(false);
             sendMessageCallBack.handleMessageSendException(topic, es, e);
             return null;
@@ -204,6 +219,12 @@ public class PulsarClientService {
                 callBack.handleCreateClientSuccess(info.getKey());
             } catch (PulsarClientException e) {
                 callBack.handleCreateClientException(info.getKey());
+                if (streamConfigLogMetric != null) {
+                    streamConfigLogMetric.updateConfigLog("DataProxyGlobal",
+                            "DataProxyGlobal",
+                            StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
+                            ConfigLogTypeEnum.ERROR, e.toString());
+                }
                 logger.error("create connnection error in metasink, "
                                 + "maybe pulsar master set error, please re-check.url{}, ex1 {}",
                         info.getKey(),
@@ -235,14 +256,15 @@ public class PulsarClientService {
         return builder.build();
     }
 
-    public List<TopicProducerInfo> initTopicProducer(String topic) {
+    public List<TopicProducerInfo> initTopicProducer(String topic, String inlongGroupId,
+            String inlongStreamId) {
         List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
             List<TopicProducerInfo> newList = null;
             if (pulsarClients != null) {
                 newList = new ArrayList<>();
                 for (PulsarClient pulsarClient : pulsarClients.values()) {
                     TopicProducerInfo info = new TopicProducerInfo(pulsarClient, topic);
-                    info.initProducer();
+                    info.initProducer(inlongGroupId, inlongStreamId);
                     if (info.isCanUseToSendMessage()) {
                         newList.add(info);
                     }
@@ -256,8 +278,14 @@ public class PulsarClientService {
         return producerInfoList;
     }
 
-    private TopicProducerInfo getProducer(String topic) {
-        List<TopicProducerInfo> producerList = initTopicProducer(topic);
+    public List<TopicProducerInfo> initTopicProducer(String topic) {
+        return initTopicProducer(topic, null, null);
+    }
+
+    private TopicProducerInfo getProducer(String topic, String inlongGroupId,
+            String inlongStreamId) {
+        List<TopicProducerInfo> producerList =
+                initTopicProducer(topic, inlongGroupId, inlongStreamId);
         AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> {
             return new AtomicLong(0);
         });
@@ -371,6 +399,30 @@ public class PulsarClientService {
         }
     }
 
+    /**
+     * get inlong stream id from event
+     * @param event event
+     * @return inlong stream id
+     */
+    private String getInlongStreamId(Event event) {
+        String streamId = "";
+        if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
+            streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+        } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
+            streamId = event.getHeaders().get(AttributeConstants.INAME);
+        }
+        return streamId;
+    }
+
+    /**
+     *  get inlong group id from event
+     * @param event event
+     * @return inlong group id
+     */
+    private String getInlongGroupId(Event event) {
+        return event.getHeaders().get(AttributeConstants.GROUP_ID);
+    }
+
     public void close() {
         destroyConnection();
     }
@@ -394,7 +446,7 @@ public class PulsarClientService {
             this.topic = topic;
         }
 
-        public void initProducer() {
+        public void initProducer(String inlongGroupId, String inlongStreamId) {
             try {
                 producer = pulsarClient.newProducer().sendTimeout(sendTimeout,
                         TimeUnit.MILLISECONDS)
@@ -408,11 +460,23 @@ public class PulsarClientService {
                         .create();
                 isFinishInit = true;
             } catch (PulsarClientException e) {
-                logger.error("create pulsar client has error e = {}", e);
+                logger.error("create pulsar client has error e = {} inlongGroupId = {}, "
+                        + "inlongStreamId= {}", e, inlongGroupId, inlongStreamId);
                 isFinishInit = false;
+                if (streamConfigLogMetric != null
+                        && StringUtils.isNotEmpty(inlongGroupId)
+                        && StringUtils.isNotEmpty(inlongStreamId)) {
+                    streamConfigLogMetric.updateConfigLog(inlongGroupId,
+                            inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
+                            ConfigLogTypeEnum.ERROR, e.toString());
+                }
             }
         }
 
+        public void initProducer() {
+            initProducer(null, null);
+        }
+
         public void setCanUseSend(Boolean isCanUseSend) {
             this.isCanUseSend = isCanUseSend;
             if (!isCanUseSend) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogListResponse.java
similarity index 94%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogListResponse.java
index 3262011..a71c0c6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogListResponse.java
@@ -25,8 +25,8 @@ import lombok.EqualsAndHashCode;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
-@ApiModel("Inlong stream config log")
-public class InlongStreamConfigLogRequest
+@ApiModel("Inlong stream config log query response")
+public class InlongStreamConfigLogListResponse
         extends InlongStreamBaseInfo {
 
     @ApiModelProperty(value = "ip")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogPageRequest.java
similarity index 77%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogPageRequest.java
index 3262011..3ca7587 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogPageRequest.java
@@ -22,12 +22,20 @@ import io.swagger.annotations.ApiModelProperty;
 import java.util.Date;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.beans.PageRequest;
+import org.springframework.format.annotation.DateTimeFormat;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
-@ApiModel("Inlong stream config log")
-public class InlongStreamConfigLogRequest
-        extends InlongStreamBaseInfo {
+@ApiModel("Inlong stream config log query request")
+public class InlongStreamConfigLogPageRequest
+        extends PageRequest {
+
+    @ApiModelProperty(value = "Inlong group id")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Inlong stream id")
+    private String inlongStreamId;
 
     @ApiModelProperty(value = "ip")
     private String ip;
@@ -45,6 +53,7 @@ public class InlongStreamConfigLogRequest
     private Integer logType;
 
     @ApiModelProperty(value = "report time")
+    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date reportTime;
 
     @ApiModelProperty(value = "long info")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
index 3262011..4a58d05 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.common.pojo.stream;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
@@ -45,7 +44,7 @@ public class InlongStreamConfigLogRequest
     private Integer logType;
 
     @ApiModelProperty(value = "report time")
-    private Date reportTime;
+    private long reportTime;
 
     @ApiModelProperty(value = "long info")
     private String logInfo;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java
index 0e0667b..dfbd0c8 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
 import java.util.Date;
 
 public class StreamConfigLogEntity implements Serializable {
-    private Integer id;
 
     private String ip;
 
@@ -45,14 +44,6 @@ public class StreamConfigLogEntity implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    public Integer getId() {
-        return id;
-    }
-
-    public void setId(Integer id) {
-        this.id = id;
-    }
-
     public String getIp() {
         return ip;
     }
@@ -145,8 +136,7 @@ public class StreamConfigLogEntity implements Serializable {
             return false;
         }
         StreamConfigLogEntity other = (StreamConfigLogEntity) that;
-        return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId()))
-            && (this.getIp() == null ? other.getIp() == null : this.getIp().equals(other.getIp()))
+        return (this.getIp() == null ? other.getIp() == null : this.getIp().equals(other.getIp()))
             && (this.getVersion() == null ? other.getVersion() == null
                 : this.getVersion().equals(other.getVersion()))
             && (this.getInlongStreamId() == null ? other.getInlongStreamId() == null
@@ -171,7 +161,6 @@ public class StreamConfigLogEntity implements Serializable {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
         result = prime * result + ((getIp() == null) ? 0 : getIp().hashCode());
         result = prime * result + ((getVersion() == null) ? 0 : getVersion().hashCode());
         result = prime * result + ((getInlongStreamId() == null) ? 0 : getInlongStreamId().hashCode());
@@ -191,7 +180,6 @@ public class StreamConfigLogEntity implements Serializable {
         sb.append(getClass().getSimpleName());
         sb.append(" [");
         sb.append("Hash = ").append(hashCode());
-        sb.append(", id=").append(id);
         sb.append(", ip=").append(ip);
         sb.append(", version=").append(version);
         sb.append(", inlongStreamId=").append(inlongStreamId);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamMetricEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamMetricEntity.java
deleted file mode 100644
index 2c9e6fd..0000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamMetricEntity.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import java.util.Date;
-
-public class StreamMetricEntity implements Serializable {
-    private Integer id;
-
-    private String ip;
-
-    private String version;
-
-    private String inlongStreamId;
-
-    private String inlongGroupId;
-
-    private String componentName;
-
-    private String metricName;
-
-    private Date reportTime;
-
-    private Date modifyTime;
-
-    private String metricInfo;
-
-    private static final long serialVersionUID = 1L;
-
-    public Integer getId() {
-        return id;
-    }
-
-    public void setId(Integer id) {
-        this.id = id;
-    }
-
-    public String getIp() {
-        return ip;
-    }
-
-    public void setIp(String ip) {
-        this.ip = ip == null ? null : ip.trim();
-    }
-
-    public String getVersion() {
-        return version;
-    }
-
-    public void setVersion(String version) {
-        this.version = version == null ? null : version.trim();
-    }
-
-    public String getInlongStreamId() {
-        return inlongStreamId;
-    }
-
-    public void setInlongStreamId(String inlongStreamId) {
-        this.inlongStreamId = inlongStreamId == null ? null : inlongStreamId.trim();
-    }
-
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId == null ? null : inlongGroupId.trim();
-    }
-
-    public String getComponentName() {
-        return componentName;
-    }
-
-    public void setComponentName(String componentName) {
-        this.componentName = componentName == null ? null : componentName.trim();
-    }
-
-    public String getMetricName() {
-        return metricName;
-    }
-
-    public void setMetricName(String metricName) {
-        this.metricName = metricName == null ? null : metricName.trim();
-    }
-
-    public Date getReportTime() {
-        return reportTime;
-    }
-
-    public void setReportTime(Date reportTime) {
-        this.reportTime = reportTime;
-    }
-
-    public Date getModifyTime() {
-        return modifyTime;
-    }
-
-    public void setModifyTime(Date modifyTime) {
-        this.modifyTime = modifyTime;
-    }
-
-    public String getMetricInfo() {
-        return metricInfo;
-    }
-
-    public void setMetricInfo(String metricInfo) {
-        this.metricInfo = metricInfo == null ? null : metricInfo.trim();
-    }
-
-    @Override
-    public boolean equals(Object that) {
-        if (this == that) {
-            return true;
-        }
-        if (that == null) {
-            return false;
-        }
-        if (getClass() != that.getClass()) {
-            return false;
-        }
-        StreamMetricEntity other = (StreamMetricEntity) that;
-        return (this.getId() == null ? other.getId() == null
-                : this.getId().equals(other.getId()))
-            && (this.getIp() == null ? other.getIp() == null
-                : this.getIp().equals(other.getIp()))
-            && (this.getVersion() == null ? other.getVersion() == null
-                : this.getVersion().equals(other.getVersion()))
-            && (this.getInlongStreamId() == null ? other.getInlongStreamId() == null
-                : this.getInlongStreamId().equals(other.getInlongStreamId()))
-            && (this.getInlongGroupId() == null ? other.getInlongGroupId() == null
-                : this.getInlongGroupId().equals(other.getInlongGroupId()))
-            && (this.getComponentName() == null ? other.getComponentName() == null
-                : this.getComponentName().equals(other.getComponentName()))
-            && (this.getMetricName() == null ? other.getMetricName() == null
-                : this.getMetricName().equals(other.getMetricName()))
-            && (this.getReportTime() == null ? other.getReportTime() == null
-                : this.getReportTime().equals(other.getReportTime()))
-            && (this.getModifyTime() == null ? other.getModifyTime() == null
-                : this.getModifyTime().equals(other.getModifyTime()))
-            && (this.getMetricInfo() == null ? other.getMetricInfo() == null
-                : this.getMetricInfo().equals(other.getMetricInfo()));
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
-        result = prime * result + ((getIp() == null) ? 0 : getIp().hashCode());
-        result = prime * result + ((getVersion() == null) ? 0 : getVersion().hashCode());
-        result = prime * result + ((getInlongStreamId() == null) ? 0 : getInlongStreamId().hashCode());
-        result = prime * result + ((getInlongGroupId() == null) ? 0 : getInlongGroupId().hashCode());
-        result = prime * result + ((getComponentName() == null) ? 0 : getComponentName().hashCode());
-        result = prime * result + ((getMetricName() == null) ? 0 : getMetricName().hashCode());
-        result = prime * result + ((getReportTime() == null) ? 0 : getReportTime().hashCode());
-        result = prime * result + ((getModifyTime() == null) ? 0 : getModifyTime().hashCode());
-        result = prime * result + ((getMetricInfo() == null) ? 0 : getMetricInfo().hashCode());
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(getClass().getSimpleName());
-        sb.append(" [");
-        sb.append("Hash = ").append(hashCode());
-        sb.append(", id=").append(id);
-        sb.append(", ip=").append(ip);
-        sb.append(", version=").append(version);
-        sb.append(", inlongStreamId=").append(inlongStreamId);
-        sb.append(", inlongGroupId=").append(inlongGroupId);
-        sb.append(", componentName=").append(componentName);
-        sb.append(", metricName=").append(metricName);
-        sb.append(", reportTime=").append(reportTime);
-        sb.append(", modifyTime=").append(modifyTime);
-        sb.append(", metricInfo=").append(metricInfo);
-        sb.append(", serialVersionUID=").append(serialVersionUID);
-        sb.append("]");
-        return sb.toString();
-    }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
index e4565ad..66b83f0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
@@ -18,22 +18,12 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import java.util.List;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
 import org.apache.inlong.manager.dao.entity.StreamConfigLogEntity;
 
 public interface StreamConfigLogEntityMapper {
-    int deleteByPrimaryKey(Integer id);
 
-    int insertList(List<StreamConfigLogEntity> records);
+    int insertOrUpdateAll(List<StreamConfigLogEntity> records);
 
-    int insert(StreamConfigLogEntity record);
-
-    int insertSelective(StreamConfigLogEntity record);
-
-    StreamConfigLogEntity selectByPrimaryKey(Integer id);
-
-    int updateByPrimaryKeySelective(StreamConfigLogEntity record);
-
-    int updateByPrimaryKeyWithBLOBs(StreamConfigLogEntity record);
-
-    int updateByPrimaryKey(StreamConfigLogEntity record);
+    List<StreamConfigLogEntity> selectByCondition(InlongStreamConfigLogPageRequest request);
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamMetricEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamMetricEntityMapper.java
deleted file mode 100644
index 6b8ae52..0000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamMetricEntityMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import java.util.List;
-import org.apache.inlong.manager.dao.entity.StreamMetricEntity;
-
-public interface StreamMetricEntityMapper {
-    int deleteByPrimaryKey(Integer id);
-
-    int insertList(List<StreamMetricEntity> records);
-
-    int insert(StreamMetricEntity record);
-
-    int insertSelective(StreamMetricEntity record);
-
-    StreamMetricEntity selectByPrimaryKey(Integer id);
-
-    int updateByPrimaryKeySelective(StreamMetricEntity record);
-
-    int updateByPrimaryKeyWithBLOBs(StreamMetricEntity record);
-
-    int updateByPrimaryKey(StreamMetricEntity record);
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index b44d175..c94568f 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -254,13 +254,6 @@
             enableCountByExample="false" enableDeleteByExample="false"
             enableSelectByExample="false" enableUpdateByExample="false"/>
 
-        <table tableName="stream_metric" domainObjectName="StreamMetricEntity"
-            enableSelectByPrimaryKey="true"
-            enableUpdateByPrimaryKey="true"
-            enableDeleteByPrimaryKey="true" enableInsert="true"
-            enableCountByExample="false" enableDeleteByExample="false"
-            enableSelectByExample="false" enableUpdateByExample="false"/>
-
         <table tableName="sort_source_config" domainObjectName="SortSourceConfigEntity"
                enableSelectByPrimaryKey="true"
                enableUpdateByPrimaryKey="true"
@@ -268,6 +261,5 @@
                enableCountByExample="false" enableDeleteByExample="false"
                enableSelectByExample="false" enableUpdateByExample="false"/>-->
 
-
     </context>
 </generatorConfiguration>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml
index da24f6a..6f281c5 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml
@@ -21,14 +21,13 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.StreamConfigLogEntityMapper">
   <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
-    <id column="id" jdbcType="INTEGER" property="id" />
-    <result column="ip" jdbcType="VARCHAR" property="ip" />
+    <id column="ip" jdbcType="VARCHAR" property="ip" />
+    <id column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+    <id column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+    <id column="component_name" jdbcType="VARCHAR" property="componentName" />
+    <id column="config_name" jdbcType="VARCHAR" property="configName" />
+    <id column="log_type" jdbcType="INTEGER" property="logType" />
     <result column="version" jdbcType="VARCHAR" property="version" />
-    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
-    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
-    <result column="component_name" jdbcType="VARCHAR" property="componentName" />
-    <result column="config_name" jdbcType="VARCHAR" property="configName" />
-    <result column="log_type" jdbcType="INTEGER" property="logType" />
     <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
     <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
   </resultMap>
@@ -36,180 +35,47 @@
     <result column="log_info" jdbcType="LONGVARCHAR" property="logInfo" />
   </resultMap>
   <sql id="Base_Column_List">
-    id, ip, version, inlong_stream_id, inlong_group_id, component_name, config_name, 
+    ip, version, inlong_stream_id, inlong_group_id, component_name, config_name,
     log_type, report_time, modify_time
   </sql>
   <sql id="Blob_Column_List">
     log_info
   </sql>
-  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
-    select 
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from stream_config_log
-    where id = #{id,jdbcType=INTEGER}
-  </select>
-  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-    delete from stream_config_log
-    where id = #{id,jdbcType=INTEGER}
-  </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
-    insert into stream_config_log (id, ip, version, 
-      inlong_stream_id, inlong_group_id, component_name, 
-      config_name, log_type, report_time, 
-      modify_time, log_info)
-    values (#{id,jdbcType=INTEGER}, #{ip,jdbcType=VARCHAR}, #{version,jdbcType=VARCHAR}, 
-      #{inlongStreamId,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{componentName,jdbcType=VARCHAR}, 
-      #{configName,jdbcType=VARCHAR}, #{logType,jdbcType=INTEGER}, #{reportTime,jdbcType=TIMESTAMP}, 
-      #{modifyTime,jdbcType=TIMESTAMP}, #{logInfo,jdbcType=LONGVARCHAR})
-  </insert>
-  <insert id="insertList" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
+  <insert id="insertOrUpdateAll" parameterType="java.util.List">
     insert into stream_config_log (ip, version,
       inlong_stream_id, inlong_group_id, component_name,
-      config_name, log_type, report_time, log_info)
-    values
+      config_name, log_type, report_time, log_info) values
     <foreach collection="list" index="index" item="item" open="" close="" separator=",">
-     (#{item.ip,jdbcType=VARCHAR}, #{item.version,jdbcType=VARCHAR},
+      (#{item.ip,jdbcType=VARCHAR}, #{item.version,jdbcType=VARCHAR},
       #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.componentName,jdbcType=VARCHAR},
       #{item.configName,jdbcType=VARCHAR}, #{item.logType,jdbcType=INTEGER}, #{item.reportTime,jdbcType=TIMESTAMP},
       #{item.logInfo,jdbcType=LONGVARCHAR})
     </foreach>
+    ON DUPLICATE key update report_time=values(report_time),log_info=values(log_info)
   </insert>
-  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
-    insert into stream_config_log
-    <trim prefix="(" suffix=")" suffixOverrides=",">
-      <if test="id != null">
-        id,
-      </if>
-      <if test="ip != null">
-        ip,
-      </if>
-      <if test="version != null">
-        version,
-      </if>
-      <if test="inlongStreamId != null">
-        inlong_stream_id,
-      </if>
-      <if test="inlongGroupId != null">
-        inlong_group_id,
-      </if>
-      <if test="componentName != null">
-        component_name,
-      </if>
-      <if test="configName != null">
-        config_name,
-      </if>
-      <if test="logType != null">
-        log_type,
-      </if>
-      <if test="reportTime != null">
-        report_time,
-      </if>
-      <if test="modifyTime != null">
-        modify_time,
-      </if>
-      <if test="logInfo != null">
-        log_info,
-      </if>
-    </trim>
-    <trim prefix="values (" suffix=")" suffixOverrides=",">
-      <if test="id != null">
-        #{id,jdbcType=INTEGER},
-      </if>
-      <if test="ip != null">
-        #{ip,jdbcType=VARCHAR},
-      </if>
-      <if test="version != null">
-        #{version,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongStreamId != null">
-        #{inlongStreamId,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongGroupId != null">
-        #{inlongGroupId,jdbcType=VARCHAR},
-      </if>
-      <if test="componentName != null">
-        #{componentName,jdbcType=VARCHAR},
-      </if>
-      <if test="configName != null">
-        #{configName,jdbcType=VARCHAR},
-      </if>
-      <if test="logType != null">
-        #{logType,jdbcType=INTEGER},
-      </if>
-      <if test="reportTime != null">
-        #{reportTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="modifyTime != null">
-        #{modifyTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="logInfo != null">
-        #{logInfo,jdbcType=LONGVARCHAR},
-      </if>
-    </trim>
-  </insert>
-  <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
-    update stream_config_log
-    <set>
-      <if test="ip != null">
-        ip = #{ip,jdbcType=VARCHAR},
-      </if>
-      <if test="version != null">
-        version = #{version,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongStreamId != null">
-        inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongGroupId != null">
-        inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      </if>
-      <if test="componentName != null">
-        component_name = #{componentName,jdbcType=VARCHAR},
-      </if>
-      <if test="configName != null">
-        config_name = #{configName,jdbcType=VARCHAR},
-      </if>
-      <if test="logType != null">
-        log_type = #{logType,jdbcType=INTEGER},
-      </if>
-      <if test="reportTime != null">
-        report_time = #{reportTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="modifyTime != null">
-        modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="logInfo != null">
-        log_info = #{logInfo,jdbcType=LONGVARCHAR},
-      </if>
-    </set>
-    where id = #{id,jdbcType=INTEGER}
-  </update>
-  <update id="updateByPrimaryKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
-    update stream_config_log
-    set ip = #{ip,jdbcType=VARCHAR},
-      version = #{version,jdbcType=VARCHAR},
-      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      component_name = #{componentName,jdbcType=VARCHAR},
-      config_name = #{configName,jdbcType=VARCHAR},
-      log_type = #{logType,jdbcType=INTEGER},
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-      log_info = #{logInfo,jdbcType=LONGVARCHAR}
-    where id = #{id,jdbcType=INTEGER}
-  </update>
-  <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
-    update stream_config_log
-    set ip = #{ip,jdbcType=VARCHAR},
-      version = #{version,jdbcType=VARCHAR},
-      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      component_name = #{componentName,jdbcType=VARCHAR},
-      config_name = #{configName,jdbcType=VARCHAR},
-      log_type = #{logType,jdbcType=INTEGER},
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      modify_time = #{modifyTime,jdbcType=TIMESTAMP}
-    where id = #{id,jdbcType=INTEGER}
-  </update>
+  <select id="selectByCondition" resultMap="ResultMapWithBLOBs"
+      parameterType="org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest">
+    select
+    <include refid="Base_Column_List"/>
+    ,
+    <include refid="Blob_Column_List" />
+    from stream_config_log
+    where log_type = 1
+    <if test="reportTime != null">
+      and report_time > #{reportTime, jdbcType=TIMESTAMP}
+    </if>
+    <if test="inlongStreamId != null and inlongStreamId != ''">
+      and inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
+    </if>
+    <if test="inlongGroupId != null and inlongGroupId != ''">
+      and inlong_stream_id = #{inlongGroupId, jdbcType=VARCHAR}
+    </if>
+    <if test="componentName != null and componentName != ''">
+      and inlong_stream_id = #{inlongGroupId, jdbcType=VARCHAR}
+    </if>
+    <if test="configName != null and configName != ''">
+      and inlong_stream_id = #{inlongGroupId, jdbcType=VARCHAR}
+    </if>
+    order by modify_time desc
+  </select>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamMetricEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamMetricEntityMapper.xml
deleted file mode 100644
index 1c6c972..0000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamMetricEntityMapper.xml
+++ /dev/null
@@ -1,204 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamMetricEntityMapper">
-  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    <id column="id" jdbcType="INTEGER" property="id" />
-    <result column="ip" jdbcType="VARCHAR" property="ip" />
-    <result column="version" jdbcType="VARCHAR" property="version" />
-    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
-    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
-    <result column="component_name" jdbcType="VARCHAR" property="componentName" />
-    <result column="metric_name" jdbcType="VARCHAR" property="metricName" />
-    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
-    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
-  </resultMap>
-  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    <result column="metric_info" jdbcType="LONGVARCHAR" property="metricInfo" />
-  </resultMap>
-  <sql id="Base_Column_List">
-    id, ip, version, inlong_stream_id, inlong_group_id, component_name, metric_name, 
-    report_time, modify_time
-  </sql>
-  <sql id="Blob_Column_List">
-    metric_info
-  </sql>
-  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
-    select 
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from stream_metric
-    where id = #{id,jdbcType=INTEGER}
-  </select>
-  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-    delete from stream_metric
-    where id = #{id,jdbcType=INTEGER}
-  </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    insert into stream_metric (id, ip, version, 
-      inlong_stream_id, inlong_group_id, component_name, 
-      metric_name, report_time, modify_time, 
-      metric_info)
-    values (#{id,jdbcType=INTEGER}, #{ip,jdbcType=VARCHAR}, #{version,jdbcType=VARCHAR}, 
-      #{inlongStreamId,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{componentName,jdbcType=VARCHAR}, 
-      #{metricName,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP}, 
-      #{metricInfo,jdbcType=LONGVARCHAR})
-  </insert>
-  <insert id="insertList" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    insert into stream_metric (id, ip, version,
-      inlong_stream_id, inlong_group_id, component_name,
-      metric_name, report_time,
-      metric_info)
-    values
-    <foreach collection="list" index="index" item="item" open="" close="" separator=",">
-      (#{item.ip,jdbcType=VARCHAR}, #{item.version,jdbcType=VARCHAR},
-      #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.componentName,jdbcType=VARCHAR},
-      #{item.metricName,jdbcType=VARCHAR}, #{item.reportTime,jdbcType=TIMESTAMP},
-      #{item.metricInfo,jdbcType=LONGVARCHAR})
-    </foreach>
-  </insert>
-  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    insert into stream_metric
-    <trim prefix="(" suffix=")" suffixOverrides=",">
-      <if test="id != null">
-        id,
-      </if>
-      <if test="ip != null">
-        ip,
-      </if>
-      <if test="version != null">
-        version,
-      </if>
-      <if test="inlongStreamId != null">
-        inlong_stream_id,
-      </if>
-      <if test="inlongGroupId != null">
-        inlong_group_id,
-      </if>
-      <if test="componentName != null">
-        component_name,
-      </if>
-      <if test="metricName != null">
-        metric_name,
-      </if>
-      <if test="reportTime != null">
-        report_time,
-      </if>
-      <if test="modifyTime != null">
-        modify_time,
-      </if>
-      <if test="metricInfo != null">
-        metric_info,
-      </if>
-    </trim>
-    <trim prefix="values (" suffix=")" suffixOverrides=",">
-      <if test="id != null">
-        #{id,jdbcType=INTEGER},
-      </if>
-      <if test="ip != null">
-        #{ip,jdbcType=VARCHAR},
-      </if>
-      <if test="version != null">
-        #{version,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongStreamId != null">
-        #{inlongStreamId,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongGroupId != null">
-        #{inlongGroupId,jdbcType=VARCHAR},
-      </if>
-      <if test="componentName != null">
-        #{componentName,jdbcType=VARCHAR},
-      </if>
-      <if test="metricName != null">
-        #{metricName,jdbcType=VARCHAR},
-      </if>
-      <if test="reportTime != null">
-        #{reportTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="modifyTime != null">
-        #{modifyTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="metricInfo != null">
-        #{metricInfo,jdbcType=LONGVARCHAR},
-      </if>
-    </trim>
-  </insert>
-  <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    update stream_metric
-    <set>
-      <if test="ip != null">
-        ip = #{ip,jdbcType=VARCHAR},
-      </if>
-      <if test="version != null">
-        version = #{version,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongStreamId != null">
-        inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongGroupId != null">
-        inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      </if>
-      <if test="componentName != null">
-        component_name = #{componentName,jdbcType=VARCHAR},
-      </if>
-      <if test="metricName != null">
-        metric_name = #{metricName,jdbcType=VARCHAR},
-      </if>
-      <if test="reportTime != null">
-        report_time = #{reportTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="modifyTime != null">
-        modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="metricInfo != null">
-        metric_info = #{metricInfo,jdbcType=LONGVARCHAR},
-      </if>
-    </set>
-    where id = #{id,jdbcType=INTEGER}
-  </update>
-  <update id="updateByPrimaryKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    update stream_metric
-    set ip = #{ip,jdbcType=VARCHAR},
-      version = #{version,jdbcType=VARCHAR},
-      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      component_name = #{componentName,jdbcType=VARCHAR},
-      metric_name = #{metricName,jdbcType=VARCHAR},
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-      metric_info = #{metricInfo,jdbcType=LONGVARCHAR}
-    where id = #{id,jdbcType=INTEGER}
-  </update>
-  <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
-    update stream_metric
-    set ip = #{ip,jdbcType=VARCHAR},
-      version = #{version,jdbcType=VARCHAR},
-      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      component_name = #{componentName,jdbcType=VARCHAR},
-      metric_name = #{metricName,jdbcType=VARCHAR},
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      modify_time = #{modifyTime,jdbcType=TIMESTAMP}
-    where id = #{id,jdbcType=INTEGER}
-  </update>
-</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java
index 16ccb70..79369c3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java
@@ -17,10 +17,16 @@
 
 package org.apache.inlong.manager.service.core;
 
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
 
 public interface StreamConfigLogService {
 
     String reportConfigLog(InlongStreamConfigLogRequest request);
 
+    PageInfo<InlongStreamConfigLogListResponse> listByCondition(
+            InlongStreamConfigLogPageRequest request);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamMetricService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamMetricService.java
deleted file mode 100644
index 85ce1e5..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamMetricService.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core;
-
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamMetricRequest;
-
-public interface StreamMetricService {
-
-    String reportMetric(InlongStreamMetricRequest entity);
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
index 9978637..5b6726b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
@@ -17,8 +17,19 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
 import java.util.List;
+import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamConfigLogEntity;
 import org.apache.inlong.manager.dao.mapper.StreamConfigLogEntityMapper;
 import org.apache.inlong.manager.service.core.StreamConfigLogService;
@@ -46,8 +57,31 @@ public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogE
         }
     }
 
+    @Override
+    public PageInfo<InlongStreamConfigLogListResponse> listByCondition(
+            InlongStreamConfigLogPageRequest request) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("begin to list source page by " + request);
+        }
+        Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+
+        if (request.getReportTime() == null) {
+            Instant instant = Instant.now().minus(Duration.ofMillis(5));
+            request.setReportTime(new Date(instant.toEpochMilli()));
+        }
+        Page<StreamConfigLogEntity> entityPage = (Page<StreamConfigLogEntity>)
+                streamConfigLogEntityMapper.selectByCondition(request);
+        List<InlongStreamConfigLogListResponse> detailList = CommonBeanUtils
+                .copyListProperties(entityPage, InlongStreamConfigLogListResponse::new);
+
+        PageInfo<InlongStreamConfigLogListResponse> pageInfo = new PageInfo<>(detailList);
+        pageInfo.setTotal(entityPage.getTotal());
+        return pageInfo;
+    }
+
     public boolean batchInsertEntities(List<StreamConfigLogEntity> entryList) {
-        streamConfigLogEntityMapper.insertList(entryList);
+        streamConfigLogEntityMapper.insertOrUpdateAll(entryList);
         return true;
     }
 
@@ -57,7 +91,7 @@ public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogE
         entity.setInlongGroupId(request.getInlongGroupId());
         entity.setInlongStreamId(request.getInlongStreamId());
         entity.setConfigName(request.getConfigName());
-        entity.setReportTime(request.getReportTime());
+        entity.setReportTime(new Date(request.getReportTime()));
         entity.setVersion(request.getVersion());
         entity.setLogInfo(request.getLogInfo());
         entity.setLogType(request.getLogType());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamMetricServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamMetricServiceImpl.java
deleted file mode 100644
index aa9296c..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamMetricServiceImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamMetricRequest;
-import org.apache.inlong.manager.dao.entity.StreamMetricEntity;
-import org.apache.inlong.manager.dao.mapper.StreamMetricEntityMapper;
-import org.apache.inlong.manager.service.core.StreamMetricService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class StreamMetricServiceImpl extends AbstractService<StreamMetricEntity>
-        implements StreamMetricService {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetricServiceImpl.class);
-
-    @Autowired
-    private StreamMetricEntityMapper streamMetricEntityMapper;
-
-    @Override
-    public String reportMetric(InlongStreamMetricRequest entity) {
-        if (putData(convertData(entity))) {
-            return "Receive success";
-        } else {
-            LOGGER.warn("Receive Queue is full, data will be discarded !");
-            return "Receive Queue is full, data will be discarded !";
-        }
-    }
-
-    public boolean batchInsertEntities(List<StreamMetricEntity> entryList) {
-        streamMetricEntityMapper.insertList(entryList);
-        return false;
-    }
-
-    public StreamMetricEntity convertData(InlongStreamMetricRequest request) {
-        StreamMetricEntity entity = new StreamMetricEntity();
-        entity.setComponentName(request.getComponentName());
-        entity.setInlongGroupId(request.getInlongGroupId());
-        entity.setInlongStreamId(request.getInlongStreamId());
-        entity.setMetricInfo(request.getMetricInfo());
-        entity.setReportTime(request.getReportTime());
-        entity.setVersion(request.getVersion());
-        entity.setMetricName(request.getMetricName());
-        return entity;
-    }
-}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 7d36373..530dab0 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1150,42 +1150,20 @@ CREATE TABLE `sort_source_config`
 DROP TABLE IF EXISTS `stream_config_log`;
 CREATE TABLE `stream_config_log`
 (
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `ip`               varchar(64)  NOT NULL COMMENT 'client host ip',
-    `version`          varchar(128)          DEFAULT NULL COMMENT 'client version',
-    `inlong_stream_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong stream ID for consumption',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `component_name`   varchar(64)  NOT NULL COMMENT 'current report info component name',
-    `config_name`      varchar(64)           DEFAULT NULL COMMENT 'massage in heartbeat request',
-    `log_type`         int(1)                DEFAULT NULL COMMENT '0 normal, 1 error',
+    `ip`               varchar(24)  NOT NULL COMMENT 'client host ip',
+    `version`          varchar(64)          DEFAULT NULL COMMENT 'client version',
+    `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong stream ID for consumption',
+    `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+    `component_name`   varchar(64)  NOT NULL DEFAULT '' COMMENT 'current report info component name',
+    `config_name`      varchar(64)  NOT NULL DEFAULT '' COMMENT 'massage in heartbeat request',
+    `log_type`         int(1)                DEFAULT 0 COMMENT '0 normal, 1 error',
     `log_info`         text                  DEFAULT NULL COMMENT 'massage in heartbeat request',
     `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    KEY `index_config_log_report` (`component_name`, `config_name`, `report_time`)
+    PRIMARY KEY (`ip`, `config_name`, `component_name`,`log_type`, `inlong_stream_id`,`inlong_group_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
 
 -- ----------------------------
 
--- Table structure for client metric report
--- ----------------------------
-DROP TABLE IF EXISTS `stream_metric`;
-CREATE TABLE `stream_metric`
-(
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `ip`               varchar(64)  NOT NULL COMMENT 'agent host ip',
-    `version`          varchar(128)          DEFAULT NULL COMMENT 'client version',
-    `inlong_stream_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong stream ID for consumption',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `component_name`   varchar(64)  NOT NULL COMMENT 'current report info component name',
-    `metric_name`      varchar(64)  NOT NULL COMMENT 'current report info component name',
-    `metric_info`      text                  DEFAULT NULL COMMENT 'massage in heartbeat request',
-    `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`),
-    KEY `index_metric_report` (`component_name`, `metric_name`, `report_time`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='stream metric report information table';
-
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamMetricController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamConfigLogWebController.java
similarity index 58%
rename from inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamMetricController.java
rename to inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamConfigLogWebController.java
index 6436b16..45a11ed 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamMetricController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamConfigLogWebController.java
@@ -15,30 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.web.controller.openapi;
+package org.apache.inlong.manager.web.controller;
 
+import com.github.pagehelper.PageInfo;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamMetricRequest;
-import org.apache.inlong.manager.service.core.StreamMetricService;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
+import org.apache.inlong.manager.service.core.StreamConfigLogService;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
 @RestController
-@RequestMapping("/openapi")
-@Api(tags = "Stream metric")
-public class StreamMetricController {
+@RequestMapping("/stream/config/log")
+@Api(tags = "Stream Config log")
+public class StreamConfigLogWebController {
 
     @Autowired
-    private StreamMetricService streamMetricService;
+    private StreamConfigLogService streamConfigLogService;
 
-    @PostMapping("/stream/metric/reportMetricStatus")
-    @ApiOperation(value = "Stream metric status")
-    public Response<String> reportMetricStatus(@RequestBody InlongStreamMetricRequest info) {
-        return Response.success(streamMetricService.reportMetric(info));
+    @RequestMapping(value = "/list", method = RequestMethod.GET)
+    @ApiOperation(value = "Paging query inlong stream config log")
+    public Response<PageInfo<InlongStreamConfigLogListResponse>> listByCondition(
+            InlongStreamConfigLogPageRequest request) {
+        return Response.success(streamConfigLogService.listByCondition(request));
     }
+    
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java
index 57f2f68..4fe03f1 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java
@@ -23,20 +23,22 @@ import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
 import org.apache.inlong.manager.service.core.StreamConfigLogService;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 @RestController
-@RequestMapping("/openapi")
+@RequestMapping("/openapi/stream")
 @Api(tags = "Stream Config")
 public class StreamConfigLogController {
 
     @Autowired
     private StreamConfigLogService streamConfigLogService;
 
-    @PostMapping("/stream/log/reportConfigLogStatus")
+    @PostMapping(value = "/log/reportConfigLogStatus",
+            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
     @ApiOperation(value = "stream config log status")
     public Response<String> reportStreamConfigLogStatus(@RequestBody
             InlongStreamConfigLogRequest info) {