You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/04 06:21:01 UTC
[incubator-inlong] branch master updated: [INLONG-2805][DataProxy] Add stream config log report (#2806)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 39dd6f6 [INLONG-2805][DataProxy] Add stream config log report (#2806)
39dd6f6 is described below
commit 39dd6f60cd1a80b541cf17f7da2604dbb8079324
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Mar 4 14:20:55 2022 +0800
[INLONG-2805][DataProxy] Add stream config log report (#2806)
---
.../agent/plugin/sources/reader/BinlogReader.java | 39 +++-
.../inlong/common/reporpter/AbstractReporter.java | 57 ++----
...treamMetricInfo.java => ConfigLogTypeEnum.java} | 43 ++---
.../reporpter/{ResponseType.java => Response.java} | 11 +-
.../common/reporpter/StreamConfigLogMetric.java | 134 ++++++++++++++
.../common/reporpter/StreamConfigLogReporter.java | 2 +-
.../common/reporpter/StreamMetricReporter.java | 40 ----
.../common/reporpter/dto/StreamConfigLogInfo.java | 3 +-
.../common/metric/reporter/ReporterTest.java | 17 +-
inlong-dataproxy/conf/common.properties | 7 +-
.../inlong/dataproxy/consts/ConfigConstants.java | 3 +
.../apache/inlong/dataproxy/sink/PulsarSink.java | 29 ++-
.../dataproxy/sink/pulsar/PulsarClientService.java | 92 +++++++--
...java => InlongStreamConfigLogListResponse.java} | 4 +-
....java => InlongStreamConfigLogPageRequest.java} | 15 +-
.../pojo/stream/InlongStreamConfigLogRequest.java | 3 +-
.../manager/dao/entity/StreamConfigLogEntity.java | 14 +-
.../manager/dao/entity/StreamMetricEntity.java | 197 --------------------
.../dao/mapper/StreamConfigLogEntityMapper.java | 16 +-
.../dao/mapper/StreamMetricEntityMapper.java | 39 ----
.../src/main/resources/generatorConfig.xml | 8 -
.../mappers/StreamConfigLogEntityMapper.xml | 206 ++++-----------------
.../resources/mappers/StreamMetricEntityMapper.xml | 204 --------------------
.../service/core/StreamConfigLogService.java | 6 +
.../manager/service/core/StreamMetricService.java | 26 ---
.../core/impl/StreamConfigLogServiceImpl.java | 38 +++-
.../service/core/impl/StreamMetricServiceImpl.java | 67 -------
.../manager-web/sql/apache_inlong_manager.sql | 38 +---
...ller.java => StreamConfigLogWebController.java} | 29 +--
.../openapi/StreamConfigLogController.java | 6 +-
30 files changed, 455 insertions(+), 938 deletions(-)
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
index 1f25c78..052695b 100644
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/BinlogReader.java
@@ -34,11 +34,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.snapshot.BinlogSnapshotBase;
import org.apache.inlong.agent.pojo.DebeziumFormat;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
+import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.slf4j.Logger;
@@ -48,6 +52,7 @@ public class BinlogReader implements Reader {
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogReader.class);
+ private static final String COMPONENT_NAME = "BinlogReader";
private static final String JOB_DATABASE_USER = "job.binlogJob.user";
private static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password";
private static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname";
@@ -94,6 +99,12 @@ public class BinlogReader implements Reader {
private JobProfile jobProfile;
private static final Gson gson = new Gson();
+ private boolean enableReportConfigLog;
+ private StreamConfigLogMetric streamConfigLogMetric;
+
+ private String inlongGroupId;
+ private String inlongStreamId;
+
public BinlogReader() {
}
@@ -140,6 +151,27 @@ public class BinlogReader implements Reader {
binlogSnapshot = new BinlogSnapshotBase(offsetStoreFileName);
binlogSnapshot.save(offset);
+ enableReportConfigLog =
+ Boolean.parseBoolean(jobConf.get(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,
+ "true"));
+
+ inlongGroupId = jobConf.get(CommonConstants.PROXY_INLONG_GROUP_ID,
+ CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID);
+ inlongStreamId = jobConf.get(CommonConstants.PROXY_INLONG_STREAM_ID,
+ CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID);
+
+ if (enableReportConfigLog) {
+ String reportConfigServerUrl = jobConf
+ .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, "");
+ String reportConfigLogInterval = jobConf
+ .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
+ String clientVersion = jobConf
+ .get(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
+ streamConfigLogMetric = new StreamConfigLogMetric(COMPONENT_NAME,
+ reportConfigServerUrl, Long.parseLong(reportConfigLogInterval),
+ AgentUtils.getLocalIp(), clientVersion);
+ }
+
Properties props = getEngineProps();
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(
@@ -162,8 +194,11 @@ public class BinlogReader implements Reader {
})
.using((success, message, error) -> {
if (!success) {
- LOGGER.error("binlog job with jobConf {} has "
- + "error {}", jobConf.getInstanceId(), message, error);
+ LOGGER.error("binlog job with jobConf {} has " + "error {}",
+ jobConf.getInstanceId(), message, error);
+ streamConfigLogMetric
+ .updateConfigLog(inlongGroupId, inlongStreamId, "DBConfig",
+ ConfigLogTypeEnum.ERROR, error == null ? "" : error.toString());
}
}).build();
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
index e831675..6324eaa 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
@@ -18,8 +18,6 @@
package org.apache.inlong.common.reporpter;
import com.google.gson.Gson;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -38,7 +36,7 @@ import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AbstractReporter<T, R> {
+public class AbstractReporter<T> {
public static final Logger LOGGER = LoggerFactory.getLogger(AbstractReporter.class);
@@ -54,7 +52,7 @@ public class AbstractReporter<T, R> {
private static CloseableHttpClient httpClient;
- private final Class<?> clazz = ResponseType.class;
+ private final Class<?> clazz = Response.class;
private ThreadPoolExecutor pool;
@@ -96,25 +94,26 @@ public class AbstractReporter<T, R> {
this.httpClient = httpClient;
}
- public R syncReportData(T data, String serverUrl) throws Exception {
+ public Response syncReportData(T data, String serverUrl) throws Exception {
+ if (StringUtils.isEmpty(serverUrl)) {
+ LOGGER.warn("Report config log server url is empty, so config log can not be "
+ + "reported!");
+ return null;
+ }
HttpPost httpPost = new HttpPost(serverUrl);
try {
StringEntity stringEntity = new StringEntity(gson.toJson(data));
stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
httpPost.setEntity(stringEntity);
String returnStr = executeHttpPost(httpPost);
- ResponseType<R> re = parse(returnStr);
- if (re != null) {
- return re.getResponse();
- }
+ return parse(returnStr);
} catch (Exception e) {
LOGGER.error("syncReportData has exception e = {}", e);
throw e;
}
- return null;
}
- public R syncReportData(T data) throws Exception {
+ public Response syncReportData(T data) throws Exception {
return this.syncReportData(data, serverUrl);
}
@@ -126,8 +125,8 @@ public class AbstractReporter<T, R> {
return EntityUtils.toString(response.getEntity());
}
- public Future<R> asyncReportData(T data, String serverUrl) {
- CompletableFuture<R> completableFuture = new CompletableFuture<>();
+ public Future<Response> asyncReportData(T data, String serverUrl) {
+ CompletableFuture<Response> completableFuture = new CompletableFuture<>();
if (pool != null) {
pool.execute(new RunTask(completableFuture, data, serverUrl));
@@ -138,50 +137,28 @@ public class AbstractReporter<T, R> {
return completableFuture;
}
- public Future<R> asyncReportData(T data) {
+ public Future<Response> asyncReportData(T data) {
return asyncReportData(data, serverUrl);
}
- public ResponseType<R> parse(String json) throws Exception {
+ public Response parse(String json) throws Exception {
if (StringUtils.isEmpty(json)) {
return null;
}
- ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
-
- Type objectType = buildType(clazz, type.getActualTypeArguments());
-
- return gson.fromJson(json, objectType);
- }
-
- private ParameterizedType buildType(final Class<?> raw, final Type... args) {
-
- return new ParameterizedType() {
-
- public Type getRawType() {
- return raw;
- }
-
- public Type[] getActualTypeArguments() {
- return args;
- }
-
- public Type getOwnerType() {
- return null;
- }
- };
+ return gson.fromJson(json, Response.class);
}
class RunTask implements Runnable {
- private CompletableFuture<R> completableFuture;
+ private CompletableFuture<Response> completableFuture;
private T data;
private String url;
- public RunTask(CompletableFuture<R> completableFuture, T data, String url) {
+ public RunTask(CompletableFuture<Response> completableFuture, T data, String url) {
this.completableFuture = completableFuture;
this.data = data;
this.url = url;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
similarity index 56%
rename from inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
rename to inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
index b887d91..d7d654a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ConfigLogTypeEnum.java
@@ -15,32 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.common.reporpter.dto;
+package org.apache.inlong.common.reporpter;
-import java.util.Date;
-import lombok.Getter;
-import lombok.Setter;
+import static java.util.Objects.requireNonNull;
-@Setter
-@Getter
-public class StreamMetricInfo {
+public enum ConfigLogTypeEnum {
- private String ip;
+ NORMAL(0),ERROR(1);
- private String version;
+ private int type;
- private String componentName;
+ ConfigLogTypeEnum(int type) {
+ this.type = type;
+ }
- private String metricName;
-
- private Integer logType;
-
- private Date reportTime;
-
- private String metricInfo;
-
- private String inlongGroupId;
-
- private String inlongStreamId;
+ public static ConfigLogTypeEnum getOpType(int opType) {
+ requireNonNull(opType);
+ switch (opType) {
+ case 0:
+ return NORMAL;
+ case 1:
+ return ERROR;
+ default:
+ throw new RuntimeException("config log type doesn't exist");
+ }
+ }
+ public int getType() {
+ return type;
+ }
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
similarity index 88%
rename from inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
rename to inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
index f0f1298..4f4d117 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/Response.java
@@ -22,11 +22,8 @@ import lombok.Setter;
@Setter
@Getter
-public class ResponseType<T> {
-
- private T response;
-
- public T getResponse() {
- return response;
- }
+public class Response {
+ private boolean success;
+ private String errMsg;
+ private String data;
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
new file mode 100644
index 0000000..17f2eb2
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogMetric.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.time.Instant;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamConfigLogMetric implements Runnable {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(StreamConfigLogMetric.class);
+
+ /*
+ * config name for log report
+ */
+ public static final String CONFIG_LOG_REPORT_ENABLE = "report.config.log.enable";
+ public static final String CONFIG_LOG_REPORT_SERVER_URL = "report.config.log.server.url";
+ public static final String CONFIG_LOG_REPORT_INTERVAL = "report.config.log.interval";
+ public static final String CONFIG_LOG_REPORT_CLIENT_VERSION = "report.config.log.client.version";
+ public static final String CONFIG_LOG_PULSAR_PRODUCER = "pulsar-producer";
+ public static final String CONFIG_LOG_PULSAR_CLIENT = "pulsar-client";
+
+ private StreamConfigLogReporter streamConfigLogReporter;
+
+ private String moduleName;
+
+ private String clientVersion;
+
+ private String localIp;
+
+ private long reportInterval;
+
+ public ConcurrentHashMap<String, StreamConfigLogInfo> dataCacheMap = new ConcurrentHashMap<>();
+
+ private static ScheduledExecutorService statExecutor =
+ Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder()
+ .setNameFormat("StreamConfigLogMetric-Report")
+ .setUncaughtExceptionHandler((t, e) ->
+ LOGGER.error(t.getName() + " has an uncaught exception: ", e))
+ .build());
+
+ public StreamConfigLogMetric(String moduleName, String serverUrl, long reportInterval,
+ String localIp, String clientVersion) {
+ this.streamConfigLogReporter = new StreamConfigLogReporter(serverUrl);
+ this.reportInterval = reportInterval;
+ this.moduleName = moduleName;
+ this.localIp = localIp;
+ this.clientVersion = clientVersion;
+ statExecutor.scheduleWithFixedDelay(this,
+ reportInterval, reportInterval, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * updateConfigLog
+ * @param inlongGroupId inlongGroupId
+ * @param inlongStreamId inlongStreamId
+ * @param configName configName
+ * @param configLogTypeEnum configLogTypeEnum
+ * @param log log
+ */
+ public void updateConfigLog(String inlongGroupId, String inlongStreamId, String configName,
+ ConfigLogTypeEnum configLogTypeEnum, String log) {
+ String key = moduleName + "-" + inlongGroupId + "-" + inlongStreamId + "-" + configName;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("updateConfigLog key = {}", key);
+ }
+ dataCacheMap.compute(key, (k, v) -> {
+ if (v == null) {
+ v = new StreamConfigLogInfo();
+ }
+ updateDataValue(v, inlongGroupId,
+ inlongStreamId, configName, configLogTypeEnum, log);
+ return v;
+ });
+ }
+
+ private void updateDataValue(StreamConfigLogInfo streamConfigLogInfo,
+ String inlongGroupId, String inlongStreamId, String configName,
+ ConfigLogTypeEnum configLogTypeEnum, String log) {
+ streamConfigLogInfo.setComponentName(moduleName);
+ streamConfigLogInfo.setConfigName(configName);
+ streamConfigLogInfo.setInlongGroupId(inlongGroupId);
+ streamConfigLogInfo.setInlongStreamId(inlongStreamId);
+ streamConfigLogInfo.setIp(localIp);
+ streamConfigLogInfo.setVersion(clientVersion);
+ streamConfigLogInfo.setLogInfo(log);
+ streamConfigLogInfo.setReportTime(Instant.now().toEpochMilli());
+ streamConfigLogInfo.setLogType(configLogTypeEnum.getType());
+ }
+
+ public void run() {
+ try {
+ Set<Entry<String, StreamConfigLogInfo>> set = dataCacheMap.entrySet();
+ long currentTimeMills = Instant.now().toEpochMilli();
+ for (Entry<String, StreamConfigLogInfo> entry : set) {
+ StreamConfigLogInfo streamConfigLogInfo = entry.getValue();
+ if ((currentTimeMills - streamConfigLogInfo.getReportTime()) < reportInterval) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Report metric data config key = {}!", streamConfigLogInfo.getConfigName());
+ }
+ streamConfigLogReporter.asyncReportData(streamConfigLogInfo);
+ } else {
+ dataCacheMap.remove(entry.getKey());
+ LOGGER.info("Remove expired config key {}", entry.getKey());
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Report streamConfigLogMetric has exception = {}", e);
+ }
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
index 7e1e077..14009a7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
@@ -21,7 +21,7 @@ import java.util.concurrent.RejectedExecutionHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-public class StreamConfigLogReporter extends AbstractReporter<StreamConfigLogInfo, String> {
+public class StreamConfigLogReporter extends AbstractReporter<StreamConfigLogInfo> {
public StreamConfigLogReporter(String serverUrl) {
super(serverUrl);
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
deleted file mode 100644
index bd774b8..0000000
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.reporpter;
-
-import java.util.concurrent.RejectedExecutionHandler;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
-
-public class StreamMetricReporter extends AbstractReporter<StreamMetricInfo, String> {
-
- public StreamMetricReporter(String serverUrl) {
- super(serverUrl);
- }
-
- public StreamMetricReporter(CloseableHttpClient httpClient, String serverUrl) {
- super(httpClient, serverUrl);
- }
-
- public StreamMetricReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
- int syncSendQueueSize,
- RejectedExecutionHandler rejectedExecutionHandler) {
- super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
- rejectedExecutionHandler);
- }
-}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
index 0457b6c..3ce50e1 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
@@ -17,7 +17,6 @@
package org.apache.inlong.common.reporpter.dto;
-import java.util.Date;
import lombok.Getter;
import lombok.Setter;
@@ -35,7 +34,7 @@ public class StreamConfigLogInfo {
private Integer logType;
- private Date reportTime;
+ private long reportTime;
private String logInfo;
diff --git a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
index a05dd1a..e7fdc4f 100644
--- a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
+++ b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
@@ -20,11 +20,10 @@ package org.apache.inlong.common.metric.reporter;
import static org.mockito.Mockito.mock;
import java.util.concurrent.Future;
+import org.apache.inlong.common.reporpter.Response;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
-import org.apache.inlong.common.reporpter.StreamMetricReporter;
import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
-import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -39,19 +38,7 @@ public class ReporterTest {
serverUrl);
StreamConfigLogReporter spy = Mockito.spy(streamConfigLogReporter);
StreamConfigLogInfo info = new StreamConfigLogInfo();
- Future<String> future = spy.asyncReportData(info, serverUrl);
- Assert.assertEquals(future.get(),null);
- }
-
- @Test
- public void streamMetricReporterTest() throws Exception {
- String serverUrl = "http://127.0.0.1:/8080/openapi/stream/metric/reportMetricStatus";
- CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
- StreamMetricReporter streamMetricReporter = new StreamMetricReporter(httpClient,
- serverUrl);
- StreamMetricReporter spy = Mockito.spy(streamMetricReporter);
- StreamMetricInfo info = new StreamMetricInfo();
- Future<String> future = spy.asyncReportData(info, serverUrl);
+ Future<Response> future = spy.asyncReportData(info, serverUrl);
Assert.assertEquals(future.get(),null);
}
}
diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties
index bc18246..873ade9 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -32,4 +32,9 @@ prometheusHttpPort=8080
# whether to enable audit
audit.enable=true
# audit proxy address
-audit.proxys=127.0.0.1:10081
\ No newline at end of file
+audit.proxys=127.0.0.1:10081
+
+# report config log
+report.config.log.enable=true
+report.config.log.server.url=http://127.0.0.1:8083/openapi/stream/log/reportConfigLogStatus
+report.config.log.interval=60000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index d498b9c..e3b6d8e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -19,6 +19,8 @@ package org.apache.inlong.dataproxy.consts;
public class ConfigConstants {
+ public static final String COMPONENT_NAME = "Inlong-DataProxy";
+
public static final String CONFIG_PORT = "port";
public static final String CONFIG_HOST = "host";
@@ -115,4 +117,5 @@ public class ConfigConstants {
public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = "session_max_allowed_delayed_msg_count";
public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark";
public static final String RECOVER_THREAD_COUNT = "recover_thread_count";
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 965c41e..10f230c 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -37,6 +37,7 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
+import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
@@ -145,10 +146,8 @@ public class PulsarSink extends AbstractSink implements Configurable,
private static final LogCounter logPrinterB = new LogCounter(10, 100000, 60 * 1000);
private static final LogCounter logPrinterC = new LogCounter(10, 100000, 60 * 1000);
- private static final String SINK_THREAD_NUM = "thread-num";
private int eventQueueSize = 10000;
private int badEventQueueSize = 10000;
- private int threadNum;
/*
* send thread pool
@@ -165,6 +164,9 @@ public class PulsarSink extends AbstractSink implements Configurable,
private MonitorIndexExt monitorIndexExt;
private SinkCounter sinkCounter;
+ private StreamConfigLogMetric streamConfigLogMetric;
+ private String localIp;
+
/*
* metric
*/
@@ -172,6 +174,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
private DataProxyMetricItemSet metricItemSet;
private ConfigManager configManager;
+ private Map<String, String> commonProperties;
private Map<String, String> topicProperties;
private Map<String, String> pulsarCluster;
@@ -221,14 +224,32 @@ public class PulsarSink extends AbstractSink implements Configurable,
topicProperties = configManager.getTopicProperties();
pulsarCluster = configManager.getThirdPartyClusterUrl2Token();
pulsarConfig = configManager.getThirdPartyClusterConfig(); //pulsar common config
+ commonProperties = configManager.getCommonProperties();
pulsarClientService = new PulsarClientService(pulsarConfig);
+ boolean enableReportConfigLog =
+ Boolean.parseBoolean(commonProperties
+ .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_ENABLE,"true"));
+ localIp = NetworkUtils.getLocalIp();
+ if (enableReportConfigLog) {
+ String reportConfigServerUrl = commonProperties
+ .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_SERVER_URL, "");
+ String reportConfigLogInterval = commonProperties
+ .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
+ String clientVersion = commonProperties
+ .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
+ streamConfigLogMetric = new StreamConfigLogMetric(ConfigConstants.COMPONENT_NAME,
+ reportConfigServerUrl, Long.parseLong(reportConfigLogInterval),
+ localIp, clientVersion);
+ pulsarClientService.setConfigLogMetric(streamConfigLogMetric);
+ }
+
configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() {
@Override
public void update() {
if (pulsarClientService != null) {
diffSetPublish(pulsarClientService,
- new HashSet<String>(topicProperties.values()),
- new HashSet<String>(configManager.getTopicProperties().values()));
+ new HashSet<>(topicProperties.values()),
+ new HashSet<>(configManager.getTopicProperties().values()));
}
}
});
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
index bc3dc29..7c518fa 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/PulsarClientService.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.inlong.common.monitor.LogCounter;
+import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
+import org.apache.inlong.common.reporpter.StreamConfigLogMetric;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.ThirdPartyClusterConfig;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
@@ -79,6 +81,8 @@ public class PulsarClientService {
public int pulsarConnectionsPreBroker;
private String localIp = "127.0.0.1";
+ private StreamConfigLogMetric streamConfigLogMetric;
+
/**
* PulsarClientService
*
@@ -108,6 +112,10 @@ public class PulsarClientService {
localIp = NetworkUtils.getLocalIp();
}
+ public void setConfigLogMetric(StreamConfigLogMetric streamConfigLogMetric) {
+ this.streamConfigLogMetric = streamConfigLogMetric;
+ }
+
public void initCreateConnection(CreatePulsarClientCallBack callBack) {
try {
createConnection(callBack);
@@ -129,8 +137,10 @@ public class PulsarClientService {
public boolean sendMessage(String topic, Event event,
SendMessageCallBack sendMessageCallBack, EventStat es) {
TopicProducerInfo producer = null;
+ final String inlongStreamId = getInlongStreamId(event);
+ final String inlongGroupId = getInlongGroupId(event);
try {
- producer = getProducer(topic);
+ producer = getProducer(topic, inlongGroupId, inlongStreamId);
} catch (Exception e) {
if (logPrinterA.shouldPrint()) {
/*
@@ -140,6 +150,11 @@ public class PulsarClientService {
*/
logger.error("Get producer failed!", e);
}
+ if (streamConfigLogMetric != null) {
+ streamConfigLogMetric.updateConfigLog(inlongGroupId,
+ inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+ ConfigLogTypeEnum.ERROR, e.toString());
+ }
}
/*
* If the producer is a null value,\ it means that the topic is not yet
@@ -159,13 +174,8 @@ public class PulsarClientService {
Map<String, String> proMap = new HashMap<>();
proMap.put("data_proxy_ip", localIp);
- String streamId = "";
- if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
- streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
- } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId = event.getHeaders().get(AttributeConstants.INAME);
- }
- proMap.put(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+ proMap.put(inlongStreamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+
TopicProducerInfo forCallBackP = producer;
forCallBackP.getProducer().newMessage().properties(proMap).value(event.getBody())
.sendAsync().thenAccept((msgId) -> {
@@ -173,6 +183,11 @@ public class PulsarClientService {
forCallBackP.setCanUseSend(true);
sendMessageCallBack.handleMessageSendSuccess(topic, (MessageIdImpl) msgId, es);
}).exceptionally((e) -> {
+ if (streamConfigLogMetric != null) {
+ streamConfigLogMetric.updateConfigLog(inlongGroupId,
+ inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_PRODUCER,
+ ConfigLogTypeEnum.ERROR, e.toString());
+ }
forCallBackP.setCanUseSend(false);
sendMessageCallBack.handleMessageSendException(topic, es, e);
return null;
@@ -204,6 +219,12 @@ public class PulsarClientService {
callBack.handleCreateClientSuccess(info.getKey());
} catch (PulsarClientException e) {
callBack.handleCreateClientException(info.getKey());
+ if (streamConfigLogMetric != null) {
+ streamConfigLogMetric.updateConfigLog("DataProxyGlobal",
+ "DataProxyGlobal",
+ StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
+ ConfigLogTypeEnum.ERROR, e.toString());
+ }
logger.error("create connnection error in metasink, "
+ "maybe pulsar master set error, please re-check.url{}, ex1 {}",
info.getKey(),
@@ -235,14 +256,15 @@ public class PulsarClientService {
return builder.build();
}
- public List<TopicProducerInfo> initTopicProducer(String topic) {
+ public List<TopicProducerInfo> initTopicProducer(String topic, String inlongGroupId,
+ String inlongStreamId) {
List<TopicProducerInfo> producerInfoList = producerInfoMap.computeIfAbsent(topic, (k) -> {
List<TopicProducerInfo> newList = null;
if (pulsarClients != null) {
newList = new ArrayList<>();
for (PulsarClient pulsarClient : pulsarClients.values()) {
TopicProducerInfo info = new TopicProducerInfo(pulsarClient, topic);
- info.initProducer();
+ info.initProducer(inlongGroupId, inlongStreamId);
if (info.isCanUseToSendMessage()) {
newList.add(info);
}
@@ -256,8 +278,14 @@ public class PulsarClientService {
return producerInfoList;
}
- private TopicProducerInfo getProducer(String topic) {
- List<TopicProducerInfo> producerList = initTopicProducer(topic);
+ public List<TopicProducerInfo> initTopicProducer(String topic) {
+ return initTopicProducer(topic, null, null);
+ }
+
+ private TopicProducerInfo getProducer(String topic, String inlongGroupId,
+ String inlongStreamId) {
+ List<TopicProducerInfo> producerList =
+ initTopicProducer(topic, inlongGroupId, inlongStreamId);
AtomicLong topicIndex = topicSendIndexMap.computeIfAbsent(topic, (k) -> {
return new AtomicLong(0);
});
@@ -371,6 +399,30 @@ public class PulsarClientService {
}
}
+ /**
+ * get inlong stream id from event
+ * @param event event
+ * @return inlong stream id
+ */
+ private String getInlongStreamId(Event event) {
+ String streamId = "";
+ if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) {
+ streamId = event.getHeaders().get(AttributeConstants.STREAM_ID);
+ } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) {
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
+ }
+ return streamId;
+ }
+
+ /**
+ * get inlong group id from event
+ * @param event event
+ * @return inlong group id
+ */
+ private String getInlongGroupId(Event event) {
+ return event.getHeaders().get(AttributeConstants.GROUP_ID);
+ }
+
public void close() {
destroyConnection();
}
@@ -394,7 +446,7 @@ public class PulsarClientService {
this.topic = topic;
}
- public void initProducer() {
+ public void initProducer(String inlongGroupId, String inlongStreamId) {
try {
producer = pulsarClient.newProducer().sendTimeout(sendTimeout,
TimeUnit.MILLISECONDS)
@@ -408,11 +460,23 @@ public class PulsarClientService {
.create();
isFinishInit = true;
} catch (PulsarClientException e) {
- logger.error("create pulsar client has error e = {}", e);
+ logger.error("create pulsar client has error e = {} inlongGroupId = {}, "
+ + "inlongStreamId= {}", e, inlongGroupId, inlongStreamId);
isFinishInit = false;
+ if (streamConfigLogMetric != null
+ && StringUtils.isNotEmpty(inlongGroupId)
+ && StringUtils.isNotEmpty(inlongStreamId)) {
+ streamConfigLogMetric.updateConfigLog(inlongGroupId,
+ inlongStreamId, StreamConfigLogMetric.CONFIG_LOG_PULSAR_CLIENT,
+ ConfigLogTypeEnum.ERROR, e.toString());
+ }
}
}
+ public void initProducer() {
+ initProducer(null, null);
+ }
+
public void setCanUseSend(Boolean isCanUseSend) {
this.isCanUseSend = isCanUseSend;
if (!isCanUseSend) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogListResponse.java
similarity index 94%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogListResponse.java
index 3262011..a71c0c6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogListResponse.java
@@ -25,8 +25,8 @@ import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
-@ApiModel("Inlong stream config log")
-public class InlongStreamConfigLogRequest
+@ApiModel("Inlong stream config log query response")
+public class InlongStreamConfigLogListResponse
extends InlongStreamBaseInfo {
@ApiModelProperty(value = "ip")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogPageRequest.java
similarity index 77%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogPageRequest.java
index 3262011..3ca7587 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogPageRequest.java
@@ -22,12 +22,20 @@ import io.swagger.annotations.ApiModelProperty;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.beans.PageRequest;
+import org.springframework.format.annotation.DateTimeFormat;
@Data
@EqualsAndHashCode(callSuper = true)
-@ApiModel("Inlong stream config log")
-public class InlongStreamConfigLogRequest
- extends InlongStreamBaseInfo {
+@ApiModel("Inlong stream config log query request")
+public class InlongStreamConfigLogPageRequest
+ extends PageRequest {
+
+ @ApiModelProperty(value = "Inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty(value = "Inlong stream id")
+ private String inlongStreamId;
@ApiModelProperty(value = "ip")
private String ip;
@@ -45,6 +53,7 @@ public class InlongStreamConfigLogRequest
private Integer logType;
@ApiModelProperty(value = "report time")
+ @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date reportTime;
@ApiModelProperty(value = "long info")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
index 3262011..4a58d05 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamConfigLogRequest.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.common.pojo.stream;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -45,7 +44,7 @@ public class InlongStreamConfigLogRequest
private Integer logType;
@ApiModelProperty(value = "report time")
- private Date reportTime;
+ private long reportTime;
@ApiModelProperty(value = "long info")
private String logInfo;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java
index 0e0667b..dfbd0c8 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamConfigLogEntity.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
import java.util.Date;
public class StreamConfigLogEntity implements Serializable {
- private Integer id;
private String ip;
@@ -45,14 +44,6 @@ public class StreamConfigLogEntity implements Serializable {
private static final long serialVersionUID = 1L;
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
public String getIp() {
return ip;
}
@@ -145,8 +136,7 @@ public class StreamConfigLogEntity implements Serializable {
return false;
}
StreamConfigLogEntity other = (StreamConfigLogEntity) that;
- return (this.getId() == null ? other.getId() == null : this.getId().equals(other.getId()))
- && (this.getIp() == null ? other.getIp() == null : this.getIp().equals(other.getIp()))
+ return (this.getIp() == null ? other.getIp() == null : this.getIp().equals(other.getIp()))
&& (this.getVersion() == null ? other.getVersion() == null
: this.getVersion().equals(other.getVersion()))
&& (this.getInlongStreamId() == null ? other.getInlongStreamId() == null
@@ -171,7 +161,6 @@ public class StreamConfigLogEntity implements Serializable {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
result = prime * result + ((getIp() == null) ? 0 : getIp().hashCode());
result = prime * result + ((getVersion() == null) ? 0 : getVersion().hashCode());
result = prime * result + ((getInlongStreamId() == null) ? 0 : getInlongStreamId().hashCode());
@@ -191,7 +180,6 @@ public class StreamConfigLogEntity implements Serializable {
sb.append(getClass().getSimpleName());
sb.append(" [");
sb.append("Hash = ").append(hashCode());
- sb.append(", id=").append(id);
sb.append(", ip=").append(ip);
sb.append(", version=").append(version);
sb.append(", inlongStreamId=").append(inlongStreamId);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamMetricEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamMetricEntity.java
deleted file mode 100644
index 2c9e6fd..0000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamMetricEntity.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import java.util.Date;
-
-public class StreamMetricEntity implements Serializable {
- private Integer id;
-
- private String ip;
-
- private String version;
-
- private String inlongStreamId;
-
- private String inlongGroupId;
-
- private String componentName;
-
- private String metricName;
-
- private Date reportTime;
-
- private Date modifyTime;
-
- private String metricInfo;
-
- private static final long serialVersionUID = 1L;
-
- public Integer getId() {
- return id;
- }
-
- public void setId(Integer id) {
- this.id = id;
- }
-
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip == null ? null : ip.trim();
- }
-
- public String getVersion() {
- return version;
- }
-
- public void setVersion(String version) {
- this.version = version == null ? null : version.trim();
- }
-
- public String getInlongStreamId() {
- return inlongStreamId;
- }
-
- public void setInlongStreamId(String inlongStreamId) {
- this.inlongStreamId = inlongStreamId == null ? null : inlongStreamId.trim();
- }
-
- public String getInlongGroupId() {
- return inlongGroupId;
- }
-
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId == null ? null : inlongGroupId.trim();
- }
-
- public String getComponentName() {
- return componentName;
- }
-
- public void setComponentName(String componentName) {
- this.componentName = componentName == null ? null : componentName.trim();
- }
-
- public String getMetricName() {
- return metricName;
- }
-
- public void setMetricName(String metricName) {
- this.metricName = metricName == null ? null : metricName.trim();
- }
-
- public Date getReportTime() {
- return reportTime;
- }
-
- public void setReportTime(Date reportTime) {
- this.reportTime = reportTime;
- }
-
- public Date getModifyTime() {
- return modifyTime;
- }
-
- public void setModifyTime(Date modifyTime) {
- this.modifyTime = modifyTime;
- }
-
- public String getMetricInfo() {
- return metricInfo;
- }
-
- public void setMetricInfo(String metricInfo) {
- this.metricInfo = metricInfo == null ? null : metricInfo.trim();
- }
-
- @Override
- public boolean equals(Object that) {
- if (this == that) {
- return true;
- }
- if (that == null) {
- return false;
- }
- if (getClass() != that.getClass()) {
- return false;
- }
- StreamMetricEntity other = (StreamMetricEntity) that;
- return (this.getId() == null ? other.getId() == null
- : this.getId().equals(other.getId()))
- && (this.getIp() == null ? other.getIp() == null
- : this.getIp().equals(other.getIp()))
- && (this.getVersion() == null ? other.getVersion() == null
- : this.getVersion().equals(other.getVersion()))
- && (this.getInlongStreamId() == null ? other.getInlongStreamId() == null
- : this.getInlongStreamId().equals(other.getInlongStreamId()))
- && (this.getInlongGroupId() == null ? other.getInlongGroupId() == null
- : this.getInlongGroupId().equals(other.getInlongGroupId()))
- && (this.getComponentName() == null ? other.getComponentName() == null
- : this.getComponentName().equals(other.getComponentName()))
- && (this.getMetricName() == null ? other.getMetricName() == null
- : this.getMetricName().equals(other.getMetricName()))
- && (this.getReportTime() == null ? other.getReportTime() == null
- : this.getReportTime().equals(other.getReportTime()))
- && (this.getModifyTime() == null ? other.getModifyTime() == null
- : this.getModifyTime().equals(other.getModifyTime()))
- && (this.getMetricInfo() == null ? other.getMetricInfo() == null
- : this.getMetricInfo().equals(other.getMetricInfo()));
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((getId() == null) ? 0 : getId().hashCode());
- result = prime * result + ((getIp() == null) ? 0 : getIp().hashCode());
- result = prime * result + ((getVersion() == null) ? 0 : getVersion().hashCode());
- result = prime * result + ((getInlongStreamId() == null) ? 0 : getInlongStreamId().hashCode());
- result = prime * result + ((getInlongGroupId() == null) ? 0 : getInlongGroupId().hashCode());
- result = prime * result + ((getComponentName() == null) ? 0 : getComponentName().hashCode());
- result = prime * result + ((getMetricName() == null) ? 0 : getMetricName().hashCode());
- result = prime * result + ((getReportTime() == null) ? 0 : getReportTime().hashCode());
- result = prime * result + ((getModifyTime() == null) ? 0 : getModifyTime().hashCode());
- result = prime * result + ((getMetricInfo() == null) ? 0 : getMetricInfo().hashCode());
- return result;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getClass().getSimpleName());
- sb.append(" [");
- sb.append("Hash = ").append(hashCode());
- sb.append(", id=").append(id);
- sb.append(", ip=").append(ip);
- sb.append(", version=").append(version);
- sb.append(", inlongStreamId=").append(inlongStreamId);
- sb.append(", inlongGroupId=").append(inlongGroupId);
- sb.append(", componentName=").append(componentName);
- sb.append(", metricName=").append(metricName);
- sb.append(", reportTime=").append(reportTime);
- sb.append(", modifyTime=").append(modifyTime);
- sb.append(", metricInfo=").append(metricInfo);
- sb.append(", serialVersionUID=").append(serialVersionUID);
- sb.append("]");
- return sb.toString();
- }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
index e4565ad..66b83f0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
@@ -18,22 +18,12 @@
package org.apache.inlong.manager.dao.mapper;
import java.util.List;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
import org.apache.inlong.manager.dao.entity.StreamConfigLogEntity;
public interface StreamConfigLogEntityMapper {
- int deleteByPrimaryKey(Integer id);
- int insertList(List<StreamConfigLogEntity> records);
+ int insertOrUpdateAll(List<StreamConfigLogEntity> records);
- int insert(StreamConfigLogEntity record);
-
- int insertSelective(StreamConfigLogEntity record);
-
- StreamConfigLogEntity selectByPrimaryKey(Integer id);
-
- int updateByPrimaryKeySelective(StreamConfigLogEntity record);
-
- int updateByPrimaryKeyWithBLOBs(StreamConfigLogEntity record);
-
- int updateByPrimaryKey(StreamConfigLogEntity record);
+ List<StreamConfigLogEntity> selectByCondition(InlongStreamConfigLogPageRequest request);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamMetricEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamMetricEntityMapper.java
deleted file mode 100644
index 6b8ae52..0000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamMetricEntityMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import java.util.List;
-import org.apache.inlong.manager.dao.entity.StreamMetricEntity;
-
-public interface StreamMetricEntityMapper {
- int deleteByPrimaryKey(Integer id);
-
- int insertList(List<StreamMetricEntity> records);
-
- int insert(StreamMetricEntity record);
-
- int insertSelective(StreamMetricEntity record);
-
- StreamMetricEntity selectByPrimaryKey(Integer id);
-
- int updateByPrimaryKeySelective(StreamMetricEntity record);
-
- int updateByPrimaryKeyWithBLOBs(StreamMetricEntity record);
-
- int updateByPrimaryKey(StreamMetricEntity record);
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index b44d175..c94568f 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -254,13 +254,6 @@
enableCountByExample="false" enableDeleteByExample="false"
enableSelectByExample="false" enableUpdateByExample="false"/>
- <table tableName="stream_metric" domainObjectName="StreamMetricEntity"
- enableSelectByPrimaryKey="true"
- enableUpdateByPrimaryKey="true"
- enableDeleteByPrimaryKey="true" enableInsert="true"
- enableCountByExample="false" enableDeleteByExample="false"
- enableSelectByExample="false" enableUpdateByExample="false"/>
-
<table tableName="sort_source_config" domainObjectName="SortSourceConfigEntity"
enableSelectByPrimaryKey="true"
enableUpdateByPrimaryKey="true"
@@ -268,6 +261,5 @@
enableCountByExample="false" enableDeleteByExample="false"
enableSelectByExample="false" enableUpdateByExample="false"/>-->
-
</context>
</generatorConfiguration>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml
index da24f6a..6f281c5 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamConfigLogEntityMapper.xml
@@ -21,14 +21,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamConfigLogEntityMapper">
<resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
- <id column="id" jdbcType="INTEGER" property="id" />
- <result column="ip" jdbcType="VARCHAR" property="ip" />
+ <id column="ip" jdbcType="VARCHAR" property="ip" />
+ <id column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+ <id column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+ <id column="component_name" jdbcType="VARCHAR" property="componentName" />
+ <id column="config_name" jdbcType="VARCHAR" property="configName" />
+ <id column="log_type" jdbcType="INTEGER" property="logType" />
<result column="version" jdbcType="VARCHAR" property="version" />
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
- <result column="component_name" jdbcType="VARCHAR" property="componentName" />
- <result column="config_name" jdbcType="VARCHAR" property="configName" />
- <result column="log_type" jdbcType="INTEGER" property="logType" />
<result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
</resultMap>
@@ -36,180 +35,47 @@
<result column="log_info" jdbcType="LONGVARCHAR" property="logInfo" />
</resultMap>
<sql id="Base_Column_List">
- id, ip, version, inlong_stream_id, inlong_group_id, component_name, config_name,
+ ip, version, inlong_stream_id, inlong_group_id, component_name, config_name,
log_type, report_time, modify_time
</sql>
<sql id="Blob_Column_List">
log_info
</sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from stream_config_log
- where id = #{id,jdbcType=INTEGER}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete from stream_config_log
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
- insert into stream_config_log (id, ip, version,
- inlong_stream_id, inlong_group_id, component_name,
- config_name, log_type, report_time,
- modify_time, log_info)
- values (#{id,jdbcType=INTEGER}, #{ip,jdbcType=VARCHAR}, #{version,jdbcType=VARCHAR},
- #{inlongStreamId,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{componentName,jdbcType=VARCHAR},
- #{configName,jdbcType=VARCHAR}, #{logType,jdbcType=INTEGER}, #{reportTime,jdbcType=TIMESTAMP},
- #{modifyTime,jdbcType=TIMESTAMP}, #{logInfo,jdbcType=LONGVARCHAR})
- </insert>
- <insert id="insertList" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
+ <insert id="insertOrUpdateAll" parameterType="java.util.List">
insert into stream_config_log (ip, version,
inlong_stream_id, inlong_group_id, component_name,
- config_name, log_type, report_time, log_info)
- values
+ config_name, log_type, report_time, log_info) values
<foreach collection="list" index="index" item="item" open="" close="" separator=",">
- (#{item.ip,jdbcType=VARCHAR}, #{item.version,jdbcType=VARCHAR},
+ (#{item.ip,jdbcType=VARCHAR}, #{item.version,jdbcType=VARCHAR},
#{item.inlongStreamId,jdbcType=VARCHAR}, #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.componentName,jdbcType=VARCHAR},
#{item.configName,jdbcType=VARCHAR}, #{item.logType,jdbcType=INTEGER}, #{item.reportTime,jdbcType=TIMESTAMP},
#{item.logInfo,jdbcType=LONGVARCHAR})
</foreach>
+ ON DUPLICATE key update report_time=values(report_time),log_info=values(log_info)
</insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
- insert into stream_config_log
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="ip != null">
- ip,
- </if>
- <if test="version != null">
- version,
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id,
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id,
- </if>
- <if test="componentName != null">
- component_name,
- </if>
- <if test="configName != null">
- config_name,
- </if>
- <if test="logType != null">
- log_type,
- </if>
- <if test="reportTime != null">
- report_time,
- </if>
- <if test="modifyTime != null">
- modify_time,
- </if>
- <if test="logInfo != null">
- log_info,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="ip != null">
- #{ip,jdbcType=VARCHAR},
- </if>
- <if test="version != null">
- #{version,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="componentName != null">
- #{componentName,jdbcType=VARCHAR},
- </if>
- <if test="configName != null">
- #{configName,jdbcType=VARCHAR},
- </if>
- <if test="logType != null">
- #{logType,jdbcType=INTEGER},
- </if>
- <if test="reportTime != null">
- #{reportTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- <if test="logInfo != null">
- #{logInfo,jdbcType=LONGVARCHAR},
- </if>
- </trim>
- </insert>
- <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
- update stream_config_log
- <set>
- <if test="ip != null">
- ip = #{ip,jdbcType=VARCHAR},
- </if>
- <if test="version != null">
- version = #{version,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="componentName != null">
- component_name = #{componentName,jdbcType=VARCHAR},
- </if>
- <if test="configName != null">
- config_name = #{configName,jdbcType=VARCHAR},
- </if>
- <if test="logType != null">
- log_type = #{logType,jdbcType=INTEGER},
- </if>
- <if test="reportTime != null">
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- <if test="logInfo != null">
- log_info = #{logInfo,jdbcType=LONGVARCHAR},
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
- <update id="updateByPrimaryKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
- update stream_config_log
- set ip = #{ip,jdbcType=VARCHAR},
- version = #{version,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- component_name = #{componentName,jdbcType=VARCHAR},
- config_name = #{configName,jdbcType=VARCHAR},
- log_type = #{logType,jdbcType=INTEGER},
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- log_info = #{logInfo,jdbcType=LONGVARCHAR}
- where id = #{id,jdbcType=INTEGER}
- </update>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamConfigLogEntity">
- update stream_config_log
- set ip = #{ip,jdbcType=VARCHAR},
- version = #{version,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- component_name = #{componentName,jdbcType=VARCHAR},
- config_name = #{configName,jdbcType=VARCHAR},
- log_type = #{logType,jdbcType=INTEGER},
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP}
- where id = #{id,jdbcType=INTEGER}
- </update>
+ <select id="selectByCondition" resultMap="ResultMapWithBLOBs"
+ parameterType="org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest">
+ select
+ <include refid="Base_Column_List"/>
+ ,
+ <include refid="Blob_Column_List" />
+ from stream_config_log
+ where log_type = 1
+ <if test="reportTime != null">
+ and report_time > #{reportTime, jdbcType=TIMESTAMP}
+ </if>
+ <if test="inlongStreamId != null and inlongStreamId != ''">
+ and inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
+ </if>
+ <if test="inlongGroupId != null and inlongGroupId != ''">
+ and inlong_stream_id = #{inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ <if test="componentName != null and componentName != ''">
+ and inlong_stream_id = #{inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ <if test="configName != null and configName != ''">
+ and inlong_stream_id = #{inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ order by modify_time desc
+ </select>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamMetricEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamMetricEntityMapper.xml
deleted file mode 100644
index 1c6c972..0000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamMetricEntityMapper.xml
+++ /dev/null
@@ -1,204 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamMetricEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- <id column="id" jdbcType="INTEGER" property="id" />
- <result column="ip" jdbcType="VARCHAR" property="ip" />
- <result column="version" jdbcType="VARCHAR" property="version" />
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
- <result column="component_name" jdbcType="VARCHAR" property="componentName" />
- <result column="metric_name" jdbcType="VARCHAR" property="metricName" />
- <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
- <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
- </resultMap>
- <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- <result column="metric_info" jdbcType="LONGVARCHAR" property="metricInfo" />
- </resultMap>
- <sql id="Base_Column_List">
- id, ip, version, inlong_stream_id, inlong_group_id, component_name, metric_name,
- report_time, modify_time
- </sql>
- <sql id="Blob_Column_List">
- metric_info
- </sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from stream_metric
- where id = #{id,jdbcType=INTEGER}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete from stream_metric
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- insert into stream_metric (id, ip, version,
- inlong_stream_id, inlong_group_id, component_name,
- metric_name, report_time, modify_time,
- metric_info)
- values (#{id,jdbcType=INTEGER}, #{ip,jdbcType=VARCHAR}, #{version,jdbcType=VARCHAR},
- #{inlongStreamId,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR}, #{componentName,jdbcType=VARCHAR},
- #{metricName,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP},
- #{metricInfo,jdbcType=LONGVARCHAR})
- </insert>
- <insert id="insertList" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- insert into stream_metric (id, ip, version,
- inlong_stream_id, inlong_group_id, component_name,
- metric_name, report_time,
- metric_info)
- values
- <foreach collection="list" index="index" item="item" open="" close="" separator=",">
- (#{item.ip,jdbcType=VARCHAR}, #{item.version,jdbcType=VARCHAR},
- #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.componentName,jdbcType=VARCHAR},
- #{item.metricName,jdbcType=VARCHAR}, #{item.reportTime,jdbcType=TIMESTAMP},
- #{item.metricInfo,jdbcType=LONGVARCHAR})
- </foreach>
- </insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- insert into stream_metric
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="ip != null">
- ip,
- </if>
- <if test="version != null">
- version,
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id,
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id,
- </if>
- <if test="componentName != null">
- component_name,
- </if>
- <if test="metricName != null">
- metric_name,
- </if>
- <if test="reportTime != null">
- report_time,
- </if>
- <if test="modifyTime != null">
- modify_time,
- </if>
- <if test="metricInfo != null">
- metric_info,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="ip != null">
- #{ip,jdbcType=VARCHAR},
- </if>
- <if test="version != null">
- #{version,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="componentName != null">
- #{componentName,jdbcType=VARCHAR},
- </if>
- <if test="metricName != null">
- #{metricName,jdbcType=VARCHAR},
- </if>
- <if test="reportTime != null">
- #{reportTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- <if test="metricInfo != null">
- #{metricInfo,jdbcType=LONGVARCHAR},
- </if>
- </trim>
- </insert>
- <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- update stream_metric
- <set>
- <if test="ip != null">
- ip = #{ip,jdbcType=VARCHAR},
- </if>
- <if test="version != null">
- version = #{version,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="componentName != null">
- component_name = #{componentName,jdbcType=VARCHAR},
- </if>
- <if test="metricName != null">
- metric_name = #{metricName,jdbcType=VARCHAR},
- </if>
- <if test="reportTime != null">
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- <if test="metricInfo != null">
- metric_info = #{metricInfo,jdbcType=LONGVARCHAR},
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
- <update id="updateByPrimaryKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- update stream_metric
- set ip = #{ip,jdbcType=VARCHAR},
- version = #{version,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- component_name = #{componentName,jdbcType=VARCHAR},
- metric_name = #{metricName,jdbcType=VARCHAR},
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- metric_info = #{metricInfo,jdbcType=LONGVARCHAR}
- where id = #{id,jdbcType=INTEGER}
- </update>
- <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamMetricEntity">
- update stream_metric
- set ip = #{ip,jdbcType=VARCHAR},
- version = #{version,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- component_name = #{componentName,jdbcType=VARCHAR},
- metric_name = #{metricName,jdbcType=VARCHAR},
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP}
- where id = #{id,jdbcType=INTEGER}
- </update>
-</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java
index 16ccb70..79369c3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamConfigLogService.java
@@ -17,10 +17,16 @@
package org.apache.inlong.manager.service.core;
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
public interface StreamConfigLogService {
String reportConfigLog(InlongStreamConfigLogRequest request);
+ PageInfo<InlongStreamConfigLogListResponse> listByCondition(
+ InlongStreamConfigLogPageRequest request);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamMetricService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamMetricService.java
deleted file mode 100644
index 85ce1e5..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StreamMetricService.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core;
-
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamMetricRequest;
-
-public interface StreamMetricService {
-
- String reportMetric(InlongStreamMetricRequest entity);
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
index 9978637..5b6726b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
@@ -17,8 +17,19 @@
package org.apache.inlong.manager.service.core.impl;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
import java.util.List;
+import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamConfigLogEntity;
import org.apache.inlong.manager.dao.mapper.StreamConfigLogEntityMapper;
import org.apache.inlong.manager.service.core.StreamConfigLogService;
@@ -46,8 +57,31 @@ public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogE
}
}
+ @Override
+ public PageInfo<InlongStreamConfigLogListResponse> listByCondition(
+ InlongStreamConfigLogPageRequest request) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("begin to list source page by " + request);
+ }
+ Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+
+ if (request.getReportTime() == null) {
+ Instant instant = Instant.now().minus(Duration.ofMillis(5));
+ request.setReportTime(new Date(instant.toEpochMilli()));
+ }
+ Page<StreamConfigLogEntity> entityPage = (Page<StreamConfigLogEntity>)
+ streamConfigLogEntityMapper.selectByCondition(request);
+ List<InlongStreamConfigLogListResponse> detailList = CommonBeanUtils
+ .copyListProperties(entityPage, InlongStreamConfigLogListResponse::new);
+
+ PageInfo<InlongStreamConfigLogListResponse> pageInfo = new PageInfo<>(detailList);
+ pageInfo.setTotal(entityPage.getTotal());
+ return pageInfo;
+ }
+
public boolean batchInsertEntities(List<StreamConfigLogEntity> entryList) {
- streamConfigLogEntityMapper.insertList(entryList);
+ streamConfigLogEntityMapper.insertOrUpdateAll(entryList);
return true;
}
@@ -57,7 +91,7 @@ public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogE
entity.setInlongGroupId(request.getInlongGroupId());
entity.setInlongStreamId(request.getInlongStreamId());
entity.setConfigName(request.getConfigName());
- entity.setReportTime(request.getReportTime());
+ entity.setReportTime(new Date(request.getReportTime()));
entity.setVersion(request.getVersion());
entity.setLogInfo(request.getLogInfo());
entity.setLogType(request.getLogType());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamMetricServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamMetricServiceImpl.java
deleted file mode 100644
index aa9296c..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamMetricServiceImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamMetricRequest;
-import org.apache.inlong.manager.dao.entity.StreamMetricEntity;
-import org.apache.inlong.manager.dao.mapper.StreamMetricEntityMapper;
-import org.apache.inlong.manager.service.core.StreamMetricService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class StreamMetricServiceImpl extends AbstractService<StreamMetricEntity>
- implements StreamMetricService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetricServiceImpl.class);
-
- @Autowired
- private StreamMetricEntityMapper streamMetricEntityMapper;
-
- @Override
- public String reportMetric(InlongStreamMetricRequest entity) {
- if (putData(convertData(entity))) {
- return "Receive success";
- } else {
- LOGGER.warn("Receive Queue is full, data will be discarded !");
- return "Receive Queue is full, data will be discarded !";
- }
- }
-
- public boolean batchInsertEntities(List<StreamMetricEntity> entryList) {
- streamMetricEntityMapper.insertList(entryList);
- return false;
- }
-
- public StreamMetricEntity convertData(InlongStreamMetricRequest request) {
- StreamMetricEntity entity = new StreamMetricEntity();
- entity.setComponentName(request.getComponentName());
- entity.setInlongGroupId(request.getInlongGroupId());
- entity.setInlongStreamId(request.getInlongStreamId());
- entity.setMetricInfo(request.getMetricInfo());
- entity.setReportTime(request.getReportTime());
- entity.setVersion(request.getVersion());
- entity.setMetricName(request.getMetricName());
- return entity;
- }
-}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 7d36373..530dab0 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1150,42 +1150,20 @@ CREATE TABLE `sort_source_config`
DROP TABLE IF EXISTS `stream_config_log`;
CREATE TABLE `stream_config_log`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `ip` varchar(64) NOT NULL COMMENT 'client host ip',
- `version` varchar(128) DEFAULT NULL COMMENT 'client version',
- `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream ID for consumption',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `component_name` varchar(64) NOT NULL COMMENT 'current report info component name',
- `config_name` varchar(64) DEFAULT NULL COMMENT 'massage in heartbeat request',
- `log_type` int(1) DEFAULT NULL COMMENT '0 normal, 1 error',
+ `ip` varchar(24) NOT NULL COMMENT 'client host ip',
+ `version` varchar(64) DEFAULT NULL COMMENT 'client version',
+ `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong stream ID for consumption',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+ `component_name` varchar(64) NOT NULL DEFAULT '' COMMENT 'current report info component name',
+ `config_name` varchar(64) NOT NULL DEFAULT '' COMMENT 'massage in heartbeat request',
+ `log_type` int(1) DEFAULT 0 COMMENT '0 normal, 1 error',
`log_info` text DEFAULT NULL COMMENT 'massage in heartbeat request',
`report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- KEY `index_config_log_report` (`component_name`, `config_name`, `report_time`)
+ PRIMARY KEY (`ip`, `config_name`, `component_name`,`log_type`, `inlong_stream_id`,`inlong_group_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
-- ----------------------------
--- Table structure for client metric report
--- ----------------------------
-DROP TABLE IF EXISTS `stream_metric`;
-CREATE TABLE `stream_metric`
-(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `ip` varchar(64) NOT NULL COMMENT 'agent host ip',
- `version` varchar(128) DEFAULT NULL COMMENT 'client version',
- `inlong_stream_id` varchar(256) DEFAULT NULL COMMENT 'Inlong stream ID for consumption',
- `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
- `component_name` varchar(64) NOT NULL COMMENT 'current report info component name',
- `metric_name` varchar(64) NOT NULL COMMENT 'current report info component name',
- `metric_info` text DEFAULT NULL COMMENT 'massage in heartbeat request',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`),
- KEY `index_metric_report` (`component_name`, `metric_name`, `report_time`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='stream metric report information table';
-
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamMetricController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamConfigLogWebController.java
similarity index 58%
rename from inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamMetricController.java
rename to inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamConfigLogWebController.java
index 6436b16..45a11ed 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamMetricController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamConfigLogWebController.java
@@ -15,30 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.web.controller.openapi;
+package org.apache.inlong.manager.web.controller;
+import com.github.pagehelper.PageInfo;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamMetricRequest;
-import org.apache.inlong.manager.service.core.StreamMetricService;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
+import org.apache.inlong.manager.service.core.StreamConfigLogService;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
-@RequestMapping("/openapi")
-@Api(tags = "Stream metric")
-public class StreamMetricController {
+@RequestMapping("/stream/config/log")
+@Api(tags = "Stream Config log")
+public class StreamConfigLogWebController {
@Autowired
- private StreamMetricService streamMetricService;
+ private StreamConfigLogService streamConfigLogService;
- @PostMapping("/stream/metric/reportMetricStatus")
- @ApiOperation(value = "Stream metric status")
- public Response<String> reportMetricStatus(@RequestBody InlongStreamMetricRequest info) {
- return Response.success(streamMetricService.reportMetric(info));
+ @RequestMapping(value = "/list", method = RequestMethod.GET)
+ @ApiOperation(value = "Paging query inlong stream config log")
+ public Response<PageInfo<InlongStreamConfigLogListResponse>> listByCondition(
+ InlongStreamConfigLogPageRequest request) {
+ return Response.success(streamConfigLogService.listByCondition(request));
}
+
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java
index 57f2f68..4fe03f1 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/StreamConfigLogController.java
@@ -23,20 +23,22 @@ import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
import org.apache.inlong.manager.service.core.StreamConfigLogService;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
-@RequestMapping("/openapi")
+@RequestMapping("/openapi/stream")
@Api(tags = "Stream Config")
public class StreamConfigLogController {
@Autowired
private StreamConfigLogService streamConfigLogService;
- @PostMapping("/stream/log/reportConfigLogStatus")
+ @PostMapping(value = "/log/reportConfigLogStatus",
+ produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "stream config log status")
public Response<String> reportStreamConfigLogStatus(@RequestBody
InlongStreamConfigLogRequest info) {