You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/25 04:24:04 UTC
[incubator-inlong] branch master updated: [INLONG-2703][Common] Add common reporter to report stream config log and stream metric(#2704)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a325394 [INLONG-2703][Common] Add common reporter to report stream config log and stream metric(#2704)
a325394 is described below
commit a3253948c5033769cfd1485cb6cc82198ca3b13e
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Feb 25 12:23:59 2022 +0800
[INLONG-2703][Common] Add common reporter to report stream config log and stream metric(#2704)
---
inlong-common/pom.xml | 36 ++++
.../inlong/common/reporpter/AbstractReporter.java | 198 +++++++++++++++++++++
.../inlong/common/reporpter/ResponseType.java | 32 ++++
.../common/reporpter/StreamConfigLogReporter.java | 40 +++++
.../common/reporpter/StreamMetricReporter.java | 40 +++++
.../common/reporpter/dto/StreamConfigLogInfo.java | 46 +++++
.../common/reporpter/dto/StreamMetricInfo.java | 46 +++++
.../common/metric/reporter/ReporterTest.java | 57 ++++++
8 files changed, 495 insertions(+)
diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index 461a90f..5783a7e 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -38,6 +38,11 @@
<simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
<common.lang.version>2.4</common.lang.version>
<commons-lang3.version>3.3.2</commons-lang3.version>
+ <httpcore.version>4.4.14</httpcore.version>
+ <httpclient.version>4.5.13</httpclient.version>
+ <gson.version>2.8.5</gson.version>
+ <common.lang3.vserion>3.11</common.lang3.vserion>
+ <mock.version>4.3.1</mock.version>
</properties>
<dependencies>
@@ -62,6 +67,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <artifactId>gson</artifactId>
+ <groupId>com.google.code.gson</groupId>
+ <version>${gson.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -92,6 +103,31 @@
<artifactId>simpleclient_httpserver</artifactId>
<version>${simpleclient_httpserver.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${common.lang3.vserion}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>${httpcore.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mock.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
new file mode 100644
index 0000000..e831675
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import com.google.gson.Gson;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AbstractReporter<T, R> {
+
+ public static final Logger LOGGER = LoggerFactory.getLogger(AbstractReporter.class);
+
+ public static final String AGENT_HTTP_APPLICATION_JSON = "application/json";
+
+ private static final Gson gson = new Gson();
+
+ private static final int DEFAULT_CORE_POOL_SIZE = 1;
+
+ private static final int DEFAULT_MAX_POOL_SIZE = 2;
+
+ private static final int DEFAULT_SYNC_SEND_QUEUE_SIZE = 10000;
+
+ private static CloseableHttpClient httpClient;
+
+ private final Class<?> clazz = ResponseType.class;
+
+ private ThreadPoolExecutor pool;
+
+ private String serverUrl;
+
+ public AbstractReporter(String serverUrl) {
+ this(serverUrl, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
+ DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
+ }
+
+ public AbstractReporter(CloseableHttpClient httpClient, String serverUrl) {
+ this(httpClient, serverUrl, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
+ DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
+ }
+
+ public AbstractReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
+ int syncSendQueueSize,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+ this.serverUrl = serverUrl;
+ if (httpClient == null) {
+ RequestConfig requestConfig = RequestConfig.custom().build();
+ HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+ httpClientBuilder.setDefaultRequestConfig(requestConfig);
+ httpClient = httpClientBuilder.build();
+ }
+ if (rejectedExecutionHandler == null) {
+ rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
+ }
+ pool = new ThreadPoolExecutor(corePoolSize, maximumPoolsize,
+ 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(syncSendQueueSize),
+ Executors.defaultThreadFactory(), rejectedExecutionHandler);
+ }
+
+ public AbstractReporter(CloseableHttpClient httpClient, String serverUrl, int corePoolSize,
+ int maximumPoolSize,
+ int syncSendQueueSize,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+ this(serverUrl, corePoolSize, maximumPoolSize, syncSendQueueSize, rejectedExecutionHandler);
+ this.httpClient = httpClient;
+ }
+
+ public R syncReportData(T data, String serverUrl) throws Exception {
+ HttpPost httpPost = new HttpPost(serverUrl);
+ try {
+ StringEntity stringEntity = new StringEntity(gson.toJson(data));
+ stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
+ httpPost.setEntity(stringEntity);
+ String returnStr = executeHttpPost(httpPost);
+ ResponseType<R> re = parse(returnStr);
+ if (re != null) {
+ return re.getResponse();
+ }
+ } catch (Exception e) {
+ LOGGER.error("syncReportData has exception e = {}", e);
+ throw e;
+ }
+ return null;
+ }
+
+ public R syncReportData(T data) throws Exception {
+ return this.syncReportData(data, serverUrl);
+ }
+
+ public String executeHttpPost(HttpPost httpPost) throws Exception {
+ CloseableHttpResponse response = httpClient.execute(httpPost);
+ if (response == null) {
+ return null;
+ }
+ return EntityUtils.toString(response.getEntity());
+ }
+
+ public Future<R> asyncReportData(T data, String serverUrl) {
+ CompletableFuture<R> completableFuture = new CompletableFuture<>();
+
+ if (pool != null) {
+ pool.execute(new RunTask(completableFuture, data, serverUrl));
+ } else {
+ completableFuture.completeExceptionally(new Exception("Send pool is null!"));
+ }
+
+ return completableFuture;
+ }
+
+ public Future<R> asyncReportData(T data) {
+ return asyncReportData(data, serverUrl);
+ }
+
+ public ResponseType<R> parse(String json) throws Exception {
+
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
+
+ Type objectType = buildType(clazz, type.getActualTypeArguments());
+
+ return gson.fromJson(json, objectType);
+ }
+
+ private ParameterizedType buildType(final Class<?> raw, final Type... args) {
+
+ return new ParameterizedType() {
+
+ public Type getRawType() {
+ return raw;
+ }
+
+ public Type[] getActualTypeArguments() {
+ return args;
+ }
+
+ public Type getOwnerType() {
+ return null;
+ }
+ };
+ }
+
+ class RunTask implements Runnable {
+
+ private CompletableFuture<R> completableFuture;
+
+ private T data;
+
+ private String url;
+
+ public RunTask(CompletableFuture<R> completableFuture, T data, String url) {
+ this.completableFuture = completableFuture;
+ this.data = data;
+ this.url = url;
+ }
+
+ public void run() {
+ try {
+ completableFuture.complete(syncReportData(data, url));
+ } catch (Exception e) {
+ completableFuture.completeExceptionally(e);
+ }
+ }
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
new file mode 100644
index 0000000..f0f1298
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class ResponseType<T> {
+
+ private T response;
+
+ public T getResponse() {
+ return response;
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
new file mode 100644
index 0000000..7e1e077
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
+
+public class StreamConfigLogReporter extends AbstractReporter<StreamConfigLogInfo, String> {
+
+ public StreamConfigLogReporter(String serverUrl) {
+ super(serverUrl);
+ }
+
+ public StreamConfigLogReporter(CloseableHttpClient httpClient, String serverUrl) {
+ super(httpClient, serverUrl);
+ }
+
+ public StreamConfigLogReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
+ int syncSendQueueSize,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+ super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
+ rejectedExecutionHandler);
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
new file mode 100644
index 0000000..bd774b8
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
+
+public class StreamMetricReporter extends AbstractReporter<StreamMetricInfo, String> {
+
+ public StreamMetricReporter(String serverUrl) {
+ super(serverUrl);
+ }
+
+ public StreamMetricReporter(CloseableHttpClient httpClient, String serverUrl) {
+ super(httpClient, serverUrl);
+ }
+
+ public StreamMetricReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
+ int syncSendQueueSize,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+ super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
+ rejectedExecutionHandler);
+ }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
new file mode 100644
index 0000000..0457b6c
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter.dto;
+
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class StreamConfigLogInfo {
+
+ private String ip;
+
+ private String version;
+
+ private String componentName;
+
+ private String configName;
+
+ private Integer logType;
+
+ private Date reportTime;
+
+ private String logInfo;
+
+ private String inlongGroupId;
+
+ private String inlongStreamId;
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
new file mode 100644
index 0000000..b887d91
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter.dto;
+
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class StreamMetricInfo {
+
+ private String ip;
+
+ private String version;
+
+ private String componentName;
+
+ private String metricName;
+
+ private Integer logType;
+
+ private Date reportTime;
+
+ private String metricInfo;
+
+ private String inlongGroupId;
+
+ private String inlongStreamId;
+
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
new file mode 100644
index 0000000..a05dd1a
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.metric.reporter;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
+import org.apache.inlong.common.reporpter.StreamMetricReporter;
+import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
+import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ReporterTest {
+
+ @Test
+ public void streamConfigLogReporterTest() throws Exception {
+ String serverUrl = "http://127.0.0.1:/8080/openapi/stream/log/reportConfigLogStatus";
+ CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+ StreamConfigLogReporter streamConfigLogReporter = new StreamConfigLogReporter(httpClient,
+ serverUrl);
+ StreamConfigLogReporter spy = Mockito.spy(streamConfigLogReporter);
+ StreamConfigLogInfo info = new StreamConfigLogInfo();
+ Future<String> future = spy.asyncReportData(info, serverUrl);
+ Assert.assertEquals(future.get(),null);
+ }
+
+ @Test
+ public void streamMetricReporterTest() throws Exception {
+ String serverUrl = "http://127.0.0.1:/8080/openapi/stream/metric/reportMetricStatus";
+ CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+ StreamMetricReporter streamMetricReporter = new StreamMetricReporter(httpClient,
+ serverUrl);
+ StreamMetricReporter spy = Mockito.spy(streamMetricReporter);
+ StreamMetricInfo info = new StreamMetricInfo();
+ Future<String> future = spy.asyncReportData(info, serverUrl);
+ Assert.assertEquals(future.get(),null);
+ }
+}