You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/02/25 04:24:04 UTC

[incubator-inlong] branch master updated: [INLONG-2703][Common] Add common reporter to report stream config log and stream metric(#2704)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a325394  [INLONG-2703][Common] Add common reporter to report stream config log and stream metric(#2704)
a325394 is described below

commit a3253948c5033769cfd1485cb6cc82198ca3b13e
Author: baomingyu <ba...@163.com>
AuthorDate: Fri Feb 25 12:23:59 2022 +0800

    [INLONG-2703][Common] Add common reporter to report stream config log and stream metric(#2704)
---
 inlong-common/pom.xml                              |  36 ++++
 .../inlong/common/reporpter/AbstractReporter.java  | 198 +++++++++++++++++++++
 .../inlong/common/reporpter/ResponseType.java      |  32 ++++
 .../common/reporpter/StreamConfigLogReporter.java  |  40 +++++
 .../common/reporpter/StreamMetricReporter.java     |  40 +++++
 .../common/reporpter/dto/StreamConfigLogInfo.java  |  46 +++++
 .../common/reporpter/dto/StreamMetricInfo.java     |  46 +++++
 .../common/metric/reporter/ReporterTest.java       |  57 ++++++
 8 files changed, 495 insertions(+)

diff --git a/inlong-common/pom.xml b/inlong-common/pom.xml
index 461a90f..5783a7e 100644
--- a/inlong-common/pom.xml
+++ b/inlong-common/pom.xml
@@ -38,6 +38,11 @@
         <simpleclient_httpserver.version>0.14.1</simpleclient_httpserver.version>
         <common.lang.version>2.4</common.lang.version>
         <commons-lang3.version>3.3.2</commons-lang3.version>
+        <httpcore.version>4.4.14</httpcore.version>
+        <httpclient.version>4.5.13</httpclient.version>
+        <gson.version>2.8.5</gson.version>
+        <common.lang3.vserion>3.11</common.lang3.vserion>
+        <mock.version>4.3.1</mock.version>
     </properties>
 
     <dependencies>
@@ -62,6 +67,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <artifactId>gson</artifactId>
+            <groupId>com.google.code.gson</groupId>
+            <version>${gson.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -92,6 +103,31 @@
             <artifactId>simpleclient_httpserver</artifactId>
             <version>${simpleclient_httpserver.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${common.lang3.vserion}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpclient.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
new file mode 100644
index 0000000..e831675
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/AbstractReporter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import com.google.gson.Gson;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AbstractReporter<T, R> {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(AbstractReporter.class);
+
+    public static final String AGENT_HTTP_APPLICATION_JSON = "application/json";
+
+    private static final Gson gson = new Gson();
+
+    private static final int DEFAULT_CORE_POOL_SIZE = 1;
+
+    private static final int DEFAULT_MAX_POOL_SIZE = 2;
+
+    private static final int DEFAULT_SYNC_SEND_QUEUE_SIZE = 10000;
+
+    private static CloseableHttpClient httpClient;
+
+    private final Class<?> clazz = ResponseType.class;
+
+    private ThreadPoolExecutor pool;
+
+    private String serverUrl;
+
+    public AbstractReporter(String serverUrl) {
+        this(serverUrl, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
+                DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
+    }
+
+    public AbstractReporter(CloseableHttpClient httpClient, String serverUrl) {
+        this(httpClient, serverUrl, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
+                DEFAULT_SYNC_SEND_QUEUE_SIZE, null);
+    }
+
+    public AbstractReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
+            int syncSendQueueSize,
+            RejectedExecutionHandler rejectedExecutionHandler) {
+        this.serverUrl = serverUrl;
+        if (httpClient == null) {
+            RequestConfig requestConfig = RequestConfig.custom().build();
+            HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
+            httpClientBuilder.setDefaultRequestConfig(requestConfig);
+            httpClient = httpClientBuilder.build();
+        }
+        if (rejectedExecutionHandler == null) {
+            rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
+        }
+        pool = new ThreadPoolExecutor(corePoolSize, maximumPoolsize,
+                60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(syncSendQueueSize),
+                Executors.defaultThreadFactory(), rejectedExecutionHandler);
+    }
+
+    public AbstractReporter(CloseableHttpClient httpClient, String serverUrl, int corePoolSize,
+            int maximumPoolSize,
+            int syncSendQueueSize,
+            RejectedExecutionHandler rejectedExecutionHandler) {
+        this(serverUrl, corePoolSize, maximumPoolSize, syncSendQueueSize, rejectedExecutionHandler);
+        this.httpClient = httpClient;
+    }
+
+    public R syncReportData(T data, String serverUrl) throws Exception {
+        HttpPost httpPost = new HttpPost(serverUrl);
+        try {
+            StringEntity stringEntity = new StringEntity(gson.toJson(data));
+            stringEntity.setContentType(AGENT_HTTP_APPLICATION_JSON);
+            httpPost.setEntity(stringEntity);
+            String returnStr = executeHttpPost(httpPost);
+            ResponseType<R> re = parse(returnStr);
+            if (re != null) {
+                return re.getResponse();
+            }
+        } catch (Exception e) {
+            LOGGER.error("syncReportData has exception e = {}", e);
+            throw e;
+        }
+        return null;
+    }
+
+    public R syncReportData(T data) throws Exception {
+        return this.syncReportData(data, serverUrl);
+    }
+
+    public String executeHttpPost(HttpPost httpPost) throws Exception {
+        CloseableHttpResponse response = httpClient.execute(httpPost);
+        if (response == null) {
+            return null;
+        }
+        return EntityUtils.toString(response.getEntity());
+    }
+
+    public Future<R>  asyncReportData(T data, String serverUrl) {
+        CompletableFuture<R> completableFuture = new CompletableFuture<>();
+
+        if (pool != null) {
+            pool.execute(new RunTask(completableFuture, data, serverUrl));
+        } else {
+            completableFuture.completeExceptionally(new Exception("Send pool is null!"));
+        }
+
+        return completableFuture;
+    }
+
+    public Future<R> asyncReportData(T data) {
+        return asyncReportData(data, serverUrl);
+    }
+
+    public ResponseType<R> parse(String json) throws Exception {
+
+        if (StringUtils.isEmpty(json)) {
+            return null;
+        }
+
+        ParameterizedType type = (ParameterizedType) this.getClass().getGenericSuperclass();
+
+        Type objectType = buildType(clazz, type.getActualTypeArguments());
+
+        return gson.fromJson(json, objectType);
+    }
+
+    private ParameterizedType buildType(final Class<?> raw, final Type... args) {
+
+        return new ParameterizedType() {
+
+            public Type getRawType() {
+                return raw;
+            }
+
+            public Type[] getActualTypeArguments() {
+                return args;
+            }
+
+            public Type getOwnerType() {
+                return null;
+            }
+        };
+    }
+
+    class RunTask implements Runnable {
+
+        private CompletableFuture<R> completableFuture;
+
+        private T data;
+
+        private String url;
+
+        public RunTask(CompletableFuture<R> completableFuture, T data, String url) {
+            this.completableFuture = completableFuture;
+            this.data = data;
+            this.url = url;
+        }
+
+        public void run() {
+            try {
+                completableFuture.complete(syncReportData(data, url));
+            } catch (Exception e) {
+                completableFuture.completeExceptionally(e);
+            }
+        }
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
new file mode 100644
index 0000000..f0f1298
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/ResponseType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class ResponseType<T> {
+
+    private T response;
+
+    public T getResponse() {
+        return response;
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
new file mode 100644
index 0000000..7e1e077
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamConfigLogReporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
+
+public class StreamConfigLogReporter extends AbstractReporter<StreamConfigLogInfo, String> {
+
+    public StreamConfigLogReporter(String serverUrl) {
+        super(serverUrl);
+    }
+
+    public StreamConfigLogReporter(CloseableHttpClient httpClient, String serverUrl) {
+        super(httpClient, serverUrl);
+    }
+
+    public StreamConfigLogReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
+            int syncSendQueueSize,
+            RejectedExecutionHandler rejectedExecutionHandler) {
+        super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
+                rejectedExecutionHandler);
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
new file mode 100644
index 0000000..bd774b8
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/StreamMetricReporter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
+
+public class StreamMetricReporter extends AbstractReporter<StreamMetricInfo, String> {
+
+    public StreamMetricReporter(String serverUrl) {
+        super(serverUrl);
+    }
+
+    public StreamMetricReporter(CloseableHttpClient httpClient, String serverUrl) {
+        super(httpClient, serverUrl);
+    }
+
+    public StreamMetricReporter(String serverUrl, int corePoolSize, int maximumPoolsize,
+            int syncSendQueueSize,
+            RejectedExecutionHandler rejectedExecutionHandler) {
+        super(serverUrl, corePoolSize, maximumPoolsize, syncSendQueueSize,
+                rejectedExecutionHandler);
+    }
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
new file mode 100644
index 0000000..0457b6c
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamConfigLogInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter.dto;
+
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class StreamConfigLogInfo {
+
+    private String ip;
+
+    private String version;
+
+    private String componentName;
+
+    private String configName;
+
+    private Integer logType;
+
+    private Date reportTime;
+
+    private String logInfo;
+
+    private String inlongGroupId;
+
+    private String inlongStreamId;
+
+}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
new file mode 100644
index 0000000..b887d91
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/reporpter/dto/StreamMetricInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.reporpter.dto;
+
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+@Setter
+@Getter
+public class StreamMetricInfo {
+
+    private String ip;
+
+    private String version;
+
+    private String componentName;
+
+    private String metricName;
+
+    private Integer logType;
+
+    private Date reportTime;
+
+    private String metricInfo;
+
+    private String inlongGroupId;
+
+    private String inlongStreamId;
+
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
new file mode 100644
index 0000000..a05dd1a
--- /dev/null
+++ b/inlong-common/src/test/java/org/apache/inlong/common/metric/reporter/ReporterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.metric.reporter;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.Future;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
+import org.apache.inlong.common.reporpter.StreamMetricReporter;
+import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
+import org.apache.inlong.common.reporpter.dto.StreamMetricInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ReporterTest {
+
+    @Test
+    public void streamConfigLogReporterTest() throws Exception {
+        String serverUrl = "http://127.0.0.1:/8080/openapi/stream/log/reportConfigLogStatus";
+        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+        StreamConfigLogReporter streamConfigLogReporter = new StreamConfigLogReporter(httpClient,
+                serverUrl);
+        StreamConfigLogReporter spy = Mockito.spy(streamConfigLogReporter);
+        StreamConfigLogInfo info = new StreamConfigLogInfo();
+        Future<String> future = spy.asyncReportData(info, serverUrl);
+        Assert.assertEquals(future.get(),null);
+    }
+
+    @Test
+    public void streamMetricReporterTest() throws Exception {
+        String serverUrl = "http://127.0.0.1:/8080/openapi/stream/metric/reportMetricStatus";
+        CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
+        StreamMetricReporter streamMetricReporter = new StreamMetricReporter(httpClient,
+                serverUrl);
+        StreamMetricReporter spy = Mockito.spy(streamMetricReporter);
+        StreamMetricInfo info = new StreamMetricInfo();
+        Future<String> future = spy.asyncReportData(info, serverUrl);
+        Assert.assertEquals(future.get(),null);
+    }
+}