You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/06 14:20:30 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector]support flink-connecor-http (#1798)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d50fe1ed [Feature][Connector]support flink-connecor-http (#1798)
d50fe1ed is described below
commit d50fe1ed9fc69f28ab87aae2b1f482f04e19e2c9
Author: tmljob <69...@users.noreply.github.com>
AuthorDate: Fri May 6 22:20:24 2022 +0800
[Feature][Connector]support flink-connecor-http (#1798)
* support flink-connector-http
* modify it use-case name
---
docs/en/connector/source/Http.md | 2 +-
seatunnel-connectors/plugin-mapping.properties | 1 +
.../seatunnel-connectors-flink-dist/pom.xml | 6 +
.../seatunnel-connectors-flink/pom.xml | 1 +
.../seatunnel-connector-flink-http/pom.xml | 64 +++++
.../apache/seatunnel/flink/http/source/Http.java | 152 ++++++++++
.../flink/http/source/constant/Settings.java | 30 ++
.../flink/http/source/util/HttpClientResult.java | 61 ++++
.../flink/http/source/util/HttpClientUtils.java | 318 +++++++++++++++++++++
.../org.apache.seatunnel.flink.BaseFlinkSource | 18 ++
.../e2e/flink/http/HttpSourceToConsoleIT.java | 35 +++
.../test/resources/http/httpsource_to_console.conf | 50 ++++
12 files changed, 737 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector/source/Http.md b/docs/en/connector/source/Http.md
index 84b74c3d..d9d3a399 100644
--- a/docs/en/connector/source/Http.md
+++ b/docs/en/connector/source/Http.md
@@ -9,7 +9,7 @@ Get data from http or https interface
Engine Supported and plugin name
* [x] Spark: Http
-* [ ] Flink
+* [x] Flink: Http
:::
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index 58536d02..619911fe 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -27,6 +27,7 @@ flink.source.InfluxDbSource = seatunnel-connector-flink-influxdb
flink.source.JdbcSource = seatunnel-connector-flink-jdbc
flink.source.KafkaTableStream = seatunnel-connector-flink-kafka
flink.source.SocketStream = seatunnel-connector-flink-socket
+flink.source.Http = seatunnel-connector-flink-http
# Flink Sink
diff --git a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
index 37808da3..0e3ca69d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
@@ -86,6 +86,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-flink-http</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index 2d462a99..bb3495dd 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -43,6 +43,7 @@
<module>seatunnel-connector-flink-doris</module>
<module>seatunnel-connector-flink-influxdb</module>
<module>seatunnel-connector-flink-clickhouse</module>
+ <module>seatunnel-connector-flink-http</module>
</modules>
</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/pom.xml
new file mode 100644
index 00000000..185553ab
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-flink</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-connector-flink-http</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-flink</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
new file mode 100644
index 00000000..fc4d6068
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+ private static final String GET = "GET";
+ private static final String POST = "POST";
+ private static final int INITIAL_CAPACITY = 16;
+
+ private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+ private Config config;
+ private String url;
+ private String method;
+ private String header;
+ private String requestParams;
+ private String syncPath;
+ private Map requestMap;
+ @Override
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+ }
+
+ @Override
+ public void prepare(FlinkEnvironment env) {
+ url = config.getString(Settings.SOURCE_HTTP_URL);
+ method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+ header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+ requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+ syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+ requestMap = jsonToMap(requestParams);
+ }
+
+ @Override
+ public DataSet<Row> getData(FlinkEnvironment env) {
+ String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+ LOG.info("sync values->{}", syncValues);
+ Map syncMap = jsonToMap(syncValues);
+ if (!syncMap.isEmpty()) {
+ requestMap.putAll(syncMap);
+ }
+
+ HttpClientResult response = new HttpClientResult();
+ try {
+ Map headerMap = jsonToMap(header);
+ if (POST.equals(method)) {
+ response = HttpClientUtils.doPost(url, headerMap, requestMap);
+ } else {
+ response = HttpClientUtils.doGet(url, headerMap, requestMap);
+ }
+ } catch (Exception e) {
+ LOG.error("http call error!", e);
+ throw new RuntimeException(e);
+ }
+
+ LOG.info("http respond code->{}", response.getCode());
+ LOG.info("http respond body->{}", response.getContent());
+
+ return env.getBatchTableEnvironment().toDataSet(
+ env.getBatchTableEnvironment().fromValues(
+ DataTypes.ROW(DataTypes.FIELD("rawMsg", DataTypes.STRING())),
+ response.getContent()
+ ),
+ Row.class
+ );
+ }
+
+ private String getSyncValues(ExecutionEnvironment env, String syncPath) {
+ if (null == syncPath || syncPath.isEmpty()) {
+ return "";
+ }
+
+ List<String> values = new ArrayList<>();
+ try {
+ DataSource<String> source = env.readTextFile(syncPath);
+ if (null != source) {
+ values = source.collect();
+ }
+ } catch (Exception e) {
+ LOG.error("getSyncValues error, syncPath is {}", syncPath, e);
+ }
+ return values.isEmpty() ? "" : values.iterator().next();
+ }
+
+ private Map jsonToMap(String content) {
+ Map map = new HashMap<>(INITIAL_CAPACITY);
+ if (null == content || content.isEmpty()) {
+ return map;
+ }
+
+ try {
+ return new ObjectMapper().readValue(content, HashMap.class);
+ } catch (IOException e) {
+ //only records the log, does not handle it, and does not affect the main process.
+ LOG.error("{} json to map error!", content, e);
+ }
+ return map;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/constant/Settings.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/constant/Settings.java
new file mode 100644
index 00000000..27160c11
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/constant/Settings.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.constant;
+
+public class Settings {
+
+ public static final String SOURCE_HTTP_URL = "url";
+ public static final String SOURCE_HTTP_METHOD = "method";
+ public static final String SOURCE_HTTP_HEADER = "header";
+ public static final String SOURCE_HTTP_REQUEST_PARAMS = "request_params";
+
+ public static final String SOURCE_HTTP_SYNC_PATH = "sync_path";
+
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java
new file mode 100644
index 00000000..47bb3812
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import java.io.Serializable;
+
+public class HttpClientResult implements Serializable {
+
+ private static final long serialVersionUID = 2168152194164783950L;
+
+ /**
+ * response status code
+ */
+ private int code;
+
+ /**
+ * response body
+ */
+ private String content;
+
+ public HttpClientResult() {
+ }
+
+ public HttpClientResult(int code) {
+ this.code = code;
+ }
+
+ public HttpClientResult(int code, String content) {
+ this.code = code;
+ this.content = content;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ @Override
+ public String toString() {
+ return "HttpClientResult [code=" + code + ", content=" + content + "]";
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java
new file mode 100644
index 00000000..da31a62a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+ private HttpClientUtils() {
+ throw new IllegalStateException("Utility class");
+ }
+
+ private static final String ENCODING = "UTF-8";
+ private static final int CONNECT_TIMEOUT = 6000 * 2;
+ private static final int SOCKET_TIMEOUT = 6000 * 10;
+ private static final int INITIAL_CAPACITY = 16;
+
+ /**
+ * Send a get request without request headers and request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ // This method is reserved for future extension to users.
+ public static HttpClientResult doGet(String url) throws Exception {
+ return doGet(url, null, null);
+ }
+
+ /**
+ * Send a get request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ // This method is reserved for future extension to users.
+ public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+ return doGet(url, null, params);
+ }
+
+ /**
+ * Send a get request with request headers and request parameters
+ *
+ * @param url request address
+ * @param headers request header map
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+ // Create access address
+ URIBuilder uriBuilder = new URIBuilder(url);
+ if (params != null) {
+ Set<Entry<String, String>> entrySet = params.entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ uriBuilder.setParameter(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * setConnectTimeout:Set the connection timeout, in milliseconds.
+ * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+ * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+ */
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ HttpGet httpGet = new HttpGet(uriBuilder.build());
+ httpGet.setConfig(requestConfig);
+ try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+ // set request header
+ packageHeader(headers, httpGet);
+ // Execute the request and get the response result
+ return getHttpClientResult(httpClient, httpGet);
+ }
+ }
+
+ /**
+ * Send a post request without request headers and request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ // This method is reserved for future extension to users.
+ public static HttpClientResult doPost(String url) throws Exception {
+ return doPost(url, null, null);
+ }
+
+ /**
+ * Send post request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+ return doPost(url, null, params);
+ }
+
+ /**
+ * Send a post request with request headers and request parameters
+ *
+ * @param url request address
+ * @param headers request header map
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+ HttpPost httpPost = new HttpPost(url);
+ /**
+ * setConnectTimeout:Set the connection timeout, in milliseconds.
+ * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+ * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+ */
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpPost.setConfig(requestConfig);
+ // set request header
+ packageHeader(headers, httpPost);
+
+ // Encapsulate request parameters
+ packageParam(params, httpPost);
+
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ // Execute the request and get the response result
+ return getHttpClientResult(httpClient, httpPost);
+ }
+ }
+
+ /**
+ * Send a put request without request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPut(String url) throws Exception {
+ return doPut(url, null);
+ }
+
+ /**
+ * Send a put request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+ HttpPut httpPut = new HttpPut(url);
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpPut.setConfig(requestConfig);
+
+ packageParam(params, httpPut);
+
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ return getHttpClientResult(httpClient, httpPut);
+ }
+ }
+
+ /**
+ * Send delete request without request parameters
+ *
+ * @param url request address
+ * @return http response result
+ * @throws Exception information
+ */
+ // This method is reserved for future extension to users.
+ public static HttpClientResult doDelete(String url) throws Exception {
+
+ HttpDelete httpDelete = new HttpDelete(url);
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+ httpDelete.setConfig(requestConfig);
+ try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+ return getHttpClientResult(httpClient, httpDelete);
+ }
+ }
+
+ /**
+ * Send delete request with request parameters
+ *
+ * @param url request address
+ * @param params request parameter map
+ * @return http response result
+ * @throws Exception information
+ */
+ // This method is reserved for future extension to users.
+ public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+ if (params == null) {
+ params = new HashMap<>(INITIAL_CAPACITY);
+ }
+
+ params.put("_method", "delete");
+ return doPost(url, params);
+ }
+
+ /**
+ * encapsulate request header
+ *
+ * @param params request header map
+ * @param httpMethod http request method
+ */
+ public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+ // encapsulate request header
+ if (params != null) {
+ Set<Entry<String, String>> entrySet = params.entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ // Set to the request header to the HttpRequestBase object
+ httpMethod.setHeader(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * Encapsulate request parameters
+ *
+ * @param params request parameter map
+ * @param httpMethod http request method
+ * @throws UnsupportedEncodingException exception information
+ */
+ public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+ // Encapsulate request parameters
+ if (params != null) {
+ List<NameValuePair> nvps = new ArrayList<>();
+ Set<Entry<String, String>> entrySet = params.entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+ }
+
+ // Set to the request's http object
+ httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+ }
+ }
+
+ /**
+ * get response result
+ *
+ * @param httpClient http client object
+ * @param httpMethod http method onject
+ * @return http response result
+ * @throws Exception information
+ */
+ public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+ // execute request
+ try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+ // get return result
+ if (httpResponse != null && httpResponse.getStatusLine() != null) {
+ String content = "";
+ if (httpResponse.getEntity() != null) {
+ content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+ }
+ return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+ }
+ }
+ return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ }
+
+ /**
+ * release resources
+ *
+ * @param httpResponse http response object
+ * @param httpClient http client objet
+ * @throws IOException information
+ */
+ // This method is reserved for future extension to users.
+ public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
+ // release resources
+ if (httpResponse != null) {
+ httpResponse.close();
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
new file mode 100644
index 00000000..c884fff7
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.flink.http.source.Http
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/http/HttpSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/http/HttpSourceToConsoleIT.java
new file mode 100644
index 00000000..f4994341
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/http/HttpSourceToConsoleIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.http;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class HttpSourceToConsoleIT extends FlinkContainer {
+
+ @Test
+ public void testHttpSourceToConsoleSink() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/http/httpsource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/http/httpsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/http/httpsource_to_console.conf
new file mode 100644
index 00000000..246ba4f3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/http/httpsource_to_console.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Http {
+ url = "http://date.jsontest.com/"
+ result_table_name= "response_body"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ ConsoleSink {
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file