You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/06 14:20:30 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector]support flink-connecor-http (#1798)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d50fe1ed [Feature][Connector]support flink-connecor-http (#1798)
d50fe1ed is described below

commit d50fe1ed9fc69f28ab87aae2b1f482f04e19e2c9
Author: tmljob <69...@users.noreply.github.com>
AuthorDate: Fri May 6 22:20:24 2022 +0800

    [Feature][Connector]support flink-connecor-http (#1798)
    
    * support flink-connector-http
    * modify it use-case name
---
 docs/en/connector/source/Http.md                   |   2 +-
 seatunnel-connectors/plugin-mapping.properties     |   1 +
 .../seatunnel-connectors-flink-dist/pom.xml        |   6 +
 .../seatunnel-connectors-flink/pom.xml             |   1 +
 .../seatunnel-connector-flink-http/pom.xml         |  64 +++++
 .../apache/seatunnel/flink/http/source/Http.java   | 152 ++++++++++
 .../flink/http/source/constant/Settings.java       |  30 ++
 .../flink/http/source/util/HttpClientResult.java   |  61 ++++
 .../flink/http/source/util/HttpClientUtils.java    | 318 +++++++++++++++++++++
 .../org.apache.seatunnel.flink.BaseFlinkSource     |  18 ++
 .../e2e/flink/http/HttpSourceToConsoleIT.java      |  35 +++
 .../test/resources/http/httpsource_to_console.conf |  50 ++++
 12 files changed, 737 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector/source/Http.md b/docs/en/connector/source/Http.md
index 84b74c3d..d9d3a399 100644
--- a/docs/en/connector/source/Http.md
+++ b/docs/en/connector/source/Http.md
@@ -9,7 +9,7 @@ Get data from http or https interface
 Engine Supported and plugin name
 
 * [x] Spark: Http
-* [ ] Flink
+* [x] Flink: Http
 
 :::
 
diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties
index 58536d02..619911fe 100644
--- a/seatunnel-connectors/plugin-mapping.properties
+++ b/seatunnel-connectors/plugin-mapping.properties
@@ -27,6 +27,7 @@ flink.source.InfluxDbSource = seatunnel-connector-flink-influxdb
 flink.source.JdbcSource = seatunnel-connector-flink-jdbc
 flink.source.KafkaTableStream = seatunnel-connector-flink-kafka
 flink.source.SocketStream = seatunnel-connector-flink-socket
+flink.source.Http = seatunnel-connector-flink-http
 
 # Flink Sink
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
index 37808da3..0e3ca69d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-dist/pom.xml
@@ -86,6 +86,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-flink-http</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
index 2d462a99..bb3495dd 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/pom.xml
@@ -43,6 +43,7 @@
         <module>seatunnel-connector-flink-doris</module>
         <module>seatunnel-connector-flink-influxdb</module>
         <module>seatunnel-connector-flink-clickhouse</module>
+        <module>seatunnel-connector-flink-http</module>
     </modules>
 
 </project>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/pom.xml
new file mode 100644
index 00000000..185553ab
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>seatunnel-connectors-flink</artifactId>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>seatunnel-connector-flink-http</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>seatunnel-api-flink</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpcore</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
new file mode 100644
index 00000000..fc4d6068
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+    private String url;
+    private String method;
+    private String header;
+    private String requestParams;
+    private String syncPath;
+    private Map requestMap;
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public void prepare(FlinkEnvironment env) {
+        url = config.getString(Settings.SOURCE_HTTP_URL);
+        method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        requestMap = jsonToMap(requestParams);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+            throw new RuntimeException(e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        return env.getBatchTableEnvironment().toDataSet(
+            env.getBatchTableEnvironment().fromValues(
+                DataTypes.ROW(DataTypes.FIELD("rawMsg", DataTypes.STRING())),
+                response.getContent()
+            ),
+            Row.class
+        );
+    }
+
+    private String getSyncValues(ExecutionEnvironment env, String syncPath) {
+        if (null == syncPath || syncPath.isEmpty()) {
+            return "";
+        }
+
+        List<String> values = new ArrayList<>();
+        try {
+            DataSource<String> source = env.readTextFile(syncPath);
+            if (null != source) {
+                values = source.collect();
+            }
+        } catch (Exception e) {
+            LOG.error("getSyncValues error, syncPath is {}", syncPath, e);
+        }
+        return values.isEmpty() ? "" : values.iterator().next();
+    }
+
+    private Map jsonToMap(String content) {
+        Map map = new HashMap<>(INITIAL_CAPACITY);
+        if (null == content || content.isEmpty()) {
+            return map;
+        }
+
+        try {
+            return new ObjectMapper().readValue(content, HashMap.class);
+        } catch (IOException e) {
+            //only records the log, does not handle it, and does not affect the main process.
+            LOG.error("{} json to map error!", content, e);
+        }
+        return map;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/constant/Settings.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/constant/Settings.java
new file mode 100644
index 00000000..27160c11
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/constant/Settings.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.constant;
+
+public class Settings {
+
+    public static final String SOURCE_HTTP_URL = "url";
+    public static final String SOURCE_HTTP_METHOD = "method";
+    public static final String SOURCE_HTTP_HEADER = "header";
+    public static final String SOURCE_HTTP_REQUEST_PARAMS = "request_params";
+
+    public static final String SOURCE_HTTP_SYNC_PATH = "sync_path";
+
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java
new file mode 100644
index 00000000..47bb3812
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import java.io.Serializable;
+
+public class HttpClientResult implements Serializable {
+
+    private static final long serialVersionUID = 2168152194164783950L;
+
+    /**
+     * response status code
+     */
+    private int code;
+
+    /**
+     * response body
+     */
+    private String content;
+
+    public HttpClientResult() {
+    }
+
+    public HttpClientResult(int code) {
+        this.code = code;
+    }
+
+    public HttpClientResult(int code, String content) {
+        this.code = code;
+        this.content = content;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    @Override
+    public String toString() {
+        return "HttpClientResult [code=" + code + ", content=" + content + "]";
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java
new file mode 100644
index 00000000..da31a62a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    // This method is reserved for future extension to users.
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    // This method is reserved for future extension to users.
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    // This method is reserved for future extension to users.
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    // This method is reserved for future extension to users.
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    // This method is reserved for future extension to users.
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    }
+
+    /**
+     * release resources
+     *
+     * @param httpResponse http response object
+     * @param httpClient   http client objet
+     * @throws IOException information
+     */
+    // This method is reserved for future extension to users.
+    public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
+        // release resources
+        if (httpResponse != null) {
+            httpResponse.close();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
new file mode 100644
index 00000000..c884fff7
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/resources/META-INF/services/org.apache.seatunnel.flink.BaseFlinkSource
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.flink.http.source.Http
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/http/HttpSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/http/HttpSourceToConsoleIT.java
new file mode 100644
index 00000000..f4994341
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/http/HttpSourceToConsoleIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.http;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class HttpSourceToConsoleIT extends FlinkContainer {
+
+    @Test
+    public void testHttpSourceToConsoleSink() throws IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/http/httpsource_to_console.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/http/httpsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/http/httpsource_to_console.conf
new file mode 100644
index 00000000..246ba4f3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/http/httpsource_to_console.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the feature source plugin**
+    Http {
+      url = "http://date.jsontest.com/"
+      result_table_name= "response_body"
+    }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  ConsoleSink {
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file