You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/05/05 07:32:34 UTC

[GitHub] [incubator-seatunnel] tmljob opened a new pull request, #1798: [Feature][Connector]support flink-connecor-http

tmljob opened a new pull request, #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   support flink-connecor-http
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [X] Code changed are covered with tests, or it does not need tests for reason:
   * [X] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [X] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r866444199


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    }
+
+    /**
+     * release resources
+     *
+     * @param httpResponse http response object
+     * @param httpClient   http client objet
+     * @throws IOException information
+     */
+    public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
+        // release resources
+        if (httpResponse != null) {
+            httpResponse.close();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
+    }

Review Comment:
   I have seen it in `Spark-http`, but I guess it is reserved for users, if so, it is better to have relevant comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r866494152


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   I confirmed that parsing json through flink sql requires jsonfromat or UDF. This can be considered to add a separate transform plugin in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865806524


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   When I tried to process the data through the sql plugin, there would be an unrecognized problem. The exception is as follows:
   ```
    The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Execute Flink task error
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: java.lang.RuntimeException: Execute Flink task error
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:71)
           at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
           at org.apache.seatunnel.core.flink.SeatunnelFlink.main(SeatunnelFlink.java:33)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
           ... 11 more
   Caused by: java.lang.Exception: Flink batch transform sql execute failed, SQL: select headers from response_body
           at org.apache.seatunnel.flink.transform.Sql.processBatch(Sql.java:63)
           at org.apache.seatunnel.flink.batch.FlinkBatchExecution.start(FlinkBatchExecution.java:64)
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:68)
           ... 18 more
   Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 14: Column 'headers' not found in any table
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:149)
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
           at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:152)
           at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:67)
           at org.apache.flink.table.api.internal.TableEnvImpl.sqlQuery(TableEnvImpl.scala:528)
           at org.apache.seatunnel.flink.transform.Sql.processBatch(Sql.java:61)
           ... 20 more
   Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 14: Column 'headers' not found in any table
           at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
           at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
           at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
           at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
           at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)
           at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6015)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6178)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6164)
           at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5600)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:411)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)
           at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
           at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
           at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:144)
           ... 25 more
   Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'headers' not found in any table
           at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
           at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
           at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
           at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
           ... 45 more
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865747561


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);

Review Comment:
   The prepare method has no return value. If you move it, you need to define the requestMap as a private member variable. Do you need to do this? Do other http request parameters, such as url, method, etc., also need to be handled in this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865683157


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import java.io.Serializable;
+
+public class HttpClientResult implements Serializable {
+
+    private static final long serialVersionUID = 2168152194164783950L;
+
+    /**
+     * response status code
+     */
+    private int code;
+
+    /**
+     * response body
+     */
+    private String content;
+
+    public HttpClientResult() {
+    }
+
+    public HttpClientResult(int code) {
+        this.code = code;
+    }
+
+    public HttpClientResult(String content) {
+        this.content = content;
+    }

Review Comment:
   This three constructor is unused.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);

Review Comment:
   It's better to put these code related to config in `prepare` method, and throw the exception when parse requestParams error.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientResult.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import java.io.Serializable;
+
+public class HttpClientResult implements Serializable {
+
+    private static final long serialVersionUID = 2168152194164783950L;
+
+    /**
+     * response status code
+     */
+    private int code;
+
+    /**
+     * response body
+     */
+    private String content;
+
+    public HttpClientResult() {
+    }
+
+    public HttpClientResult(int code) {
+        this.code = code;
+    }
+
+    public HttpClientResult(String content) {
+        this.content = content;
+    }
+
+    public HttpClientResult(int code, String content) {
+        this.code = code;
+        this.content = content;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getContent() {
+        return content;
+    }
+
+    public void setContent(String content) {
+        this.content = content;
+    }

Review Comment:
   Maybe we don't need to provide the set method.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    }
+
+    /**
+     * release resources
+     *
+     * @param httpResponse http response object
+     * @param httpClient   http client objet
+     * @throws IOException information
+     */
+    public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
+        // release resources
+        if (httpResponse != null) {
+            httpResponse.close();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
+    }

Review Comment:
   Remove the unused method.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);

Review Comment:
   This log is just used to debug?



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);

Review Comment:
   This line is just used to make the compiler pass, so you can just add a comment here and return null, then you can remove this constructor.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   Do we need to use `getBatchTableEnvironment` table api to add data, if we use `env.getBatchEnvironment().fromElements(row);` here, it seems we cannot use sql transformer?



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);

Review Comment:
   You need to throw this exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865806524


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   When I tried to process the data through the sql plugin, there would be an unrecognized problem. The exception is as follows:
   ```
    The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Execute Flink task error
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: java.lang.RuntimeException: Execute Flink task error
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:71)
           at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
           at org.apache.seatunnel.core.flink.SeatunnelFlink.main(SeatunnelFlink.java:33)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
           ... 11 more
   Caused by: java.lang.Exception: Flink batch transform sql execute failed, SQL: select headers from response_body
           at org.apache.seatunnel.flink.transform.Sql.processBatch(Sql.java:63)
           at org.apache.seatunnel.flink.batch.FlinkBatchExecution.start(FlinkBatchExecution.java:64)
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:68)
           ... 18 more
   Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 14: Column 'headers' not found in any table
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:149)
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
           at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:152)
           at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:67)
           at org.apache.flink.table.api.internal.TableEnvImpl.sqlQuery(TableEnvImpl.scala:528)
           at org.apache.seatunnel.flink.transform.Sql.processBatch(Sql.java:61)
           ... 20 more
   Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 14: Column 'headers' not found in any table
           at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
           at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
           at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
           at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
           at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)
           at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6015)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6178)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6164)
           at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5600)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:411)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)
           at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
           at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
           at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:144)
           ... 25 more
   Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'headers' not found in any table
           at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
           at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
           at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
           at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
           ... 45 more
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865924790


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   > I suggest to use `env.getBatchTableEnvironment().toDataSet() ` to define the filed name.
   
   Because the responsebody returned by the http call is is variable and uncertain, it feels like there is no way to predefine the field name. After I change it to the following way, even if there is no transform process, an error will be reported.
   
   **code**
   ```
   env.getBatchTableEnvironment().toDataSet(
               env.getBatchTableEnvironment().from(response.getContent()),
               Row.class
           );
   ```
   **error log**
   ```
   21:20:47.073 [main] ERROR org.apache.seatunnel.core.base.Seatunnel -
   
   ===============================================================================
   
   
   21:20:47.078 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Fatal Error,
   
   21:20:47.078 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Please submit bug report in https://github.com/apache/incubator-seatunnel/issues
   
   21:20:47.080 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Reason:Execute Flink task error
   
   21:20:47.082 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Exception StackTrace:java.lang.RuntimeException: Execute Flink task error
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:71)
           at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
           at org.apache.seatunnel.core.flink.SeatunnelFlink.main(SeatunnelFlink.java:33)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: org.apache.flink.table.api.SqlParserException: Invalid SQL identifier {
     "gzipped": true,
     "headers": {
       "Accept-Encoding": "gzip,deflate",
       "Host": "httpbin.org",
       "User-Agent": "Apache-HttpClient/4.5.3 (Java/1.8.0_181)",
       "X-Amzn-Trace-Id": "Root=1-6273cf2e-75e23ad14971e7ad7ec2e4e1"
     },
     "method": "GET",
     "origin": "103.25.65.102"
   }
   .
           at org.apache.flink.table.calcite.CalciteParser.parseIdentifier(CalciteParser.java:70)
           at org.apache.flink.table.api.internal.TableEnvImpl.from(TableEnvImpl.scala:437)
           at org.apache.seatunnel.flink.http.source.Http.getData(Http.java:102)
           at org.apache.seatunnel.flink.batch.FlinkBatchExecution.start(FlinkBatchExecution.java:55)
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:68)
           ... 18 more
   
   21:20:47.082 [main] ERROR org.apache.seatunnel.core.base.Seatunnel -
   ===============================================================================
   ```
   
   Can you suggest a solution here? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865985154


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   You can put the whole content into one field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865924790


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   > I suggest to use `env.getBatchTableEnvironment().toDataSet() ` to define the filed name.
   
   Because the responsebody returned by the http call is undefined, it feels like there is no way to predefine the field name. After I change it to the following way, even if there is no transform process, an error will be reported.
   
   **code**
   ```
   env.getBatchTableEnvironment().toDataSet(
               env.getBatchTableEnvironment().from(response.getContent()),
               Row.class
           );
   ```
   **error log**
   ```
   21:20:47.073 [main] ERROR org.apache.seatunnel.core.base.Seatunnel -
   
   ===============================================================================
   
   
   21:20:47.078 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Fatal Error,
   
   21:20:47.078 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Please submit bug report in https://github.com/apache/incubator-seatunnel/issues
   
   21:20:47.080 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Reason:Execute Flink task error
   
   21:20:47.082 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Exception StackTrace:java.lang.RuntimeException: Execute Flink task error
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:71)
           at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
           at org.apache.seatunnel.core.flink.SeatunnelFlink.main(SeatunnelFlink.java:33)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: org.apache.flink.table.api.SqlParserException: Invalid SQL identifier {
     "gzipped": true,
     "headers": {
       "Accept-Encoding": "gzip,deflate",
       "Host": "httpbin.org",
       "User-Agent": "Apache-HttpClient/4.5.3 (Java/1.8.0_181)",
       "X-Amzn-Trace-Id": "Root=1-6273cf2e-75e23ad14971e7ad7ec2e4e1"
     },
     "method": "GET",
     "origin": "103.25.65.102"
   }
   .
           at org.apache.flink.table.calcite.CalciteParser.parseIdentifier(CalciteParser.java:70)
           at org.apache.flink.table.api.internal.TableEnvImpl.from(TableEnvImpl.scala:437)
           at org.apache.seatunnel.flink.http.source.Http.getData(Http.java:102)
           at org.apache.seatunnel.flink.batch.FlinkBatchExecution.start(FlinkBatchExecution.java:55)
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:68)
           ... 18 more
   
   21:20:47.082 [main] ERROR org.apache.seatunnel.core.base.Seatunnel -
   ===============================================================================
   ```
   Can you suggest a solution here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865924790


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   > I suggest to use `env.getBatchTableEnvironment().toDataSet() ` to define the filed name.
   
   Because the responsebody returned by the http call is undefined, it feels like there is no way to predefine the field name. After I change it to the following way, even if there is no transform process, an error will be reported.
   
   **code**
   ```
   env.getBatchTableEnvironment().toDataSet(
               env.getBatchTableEnvironment().from(response.getContent()),
               Row.class
           );
   ```
   **error log**
   ```
   21:20:47.073 [main] ERROR org.apache.seatunnel.core.base.Seatunnel -
   
   ===============================================================================
   
   
   21:20:47.078 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Fatal Error,
   
   21:20:47.078 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Please submit bug report in https://github.com/apache/incubator-seatunnel/issues
   
   21:20:47.080 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Reason:Execute Flink task error
   
   21:20:47.082 [main] ERROR org.apache.seatunnel.core.base.Seatunnel - Exception StackTrace:java.lang.RuntimeException: Execute Flink task error
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:71)
           at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
           at org.apache.seatunnel.core.flink.SeatunnelFlink.main(SeatunnelFlink.java:33)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: org.apache.flink.table.api.SqlParserException: Invalid SQL identifier {
     "gzipped": true,
     "headers": {
       "Accept-Encoding": "gzip,deflate",
       "Host": "httpbin.org",
       "User-Agent": "Apache-HttpClient/4.5.3 (Java/1.8.0_181)",
       "X-Amzn-Trace-Id": "Root=1-6273cf2e-75e23ad14971e7ad7ec2e4e1"
     },
     "method": "GET",
     "origin": "103.25.65.102"
   }
   .
           at org.apache.flink.table.calcite.CalciteParser.parseIdentifier(CalciteParser.java:70)
           at org.apache.flink.table.api.internal.TableEnvImpl.from(TableEnvImpl.scala:437)
           at org.apache.seatunnel.flink.http.source.Http.getData(Http.java:102)
           at org.apache.seatunnel.flink.batch.FlinkBatchExecution.start(FlinkBatchExecution.java:55)
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:68)
           ... 18 more
   
   21:20:47.082 [main] ERROR org.apache.seatunnel.core.base.Seatunnel -
   ===============================================================================
   ```
   
   Can you suggest a solution here? 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865787640


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);

Review Comment:
   Yes, defined as private member is ok, the prepare method is used to initialize this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865788543


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   Have you tried this plugin? How to get the data from your source plugin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865787872


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   OK!!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r866877283


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);

Review Comment:
   In this case, we can just return null, since the line 292 will never reach, right? Anyway, this is not a problem, I just want to remove the constructor `HttpClientResult(int code)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun merged pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun merged PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r866444773


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   > You can put the whole content into one field.
   
   I tried it. Even so, I can't query the fields in json through sql in the sql transform plugin. Does this require a predefined json schema when executing Flink SQL? (spark engine does not have this problem)
   **code**
   ```
    return env.getBatchTableEnvironment().toDataSet(
               env.getBatchTableEnvironment().fromValues(
                   DataTypes.ROW(DataTypes.FIELD("rawMsg", DataTypes.STRING())),
                   response.getContent()
               ),
               Row.class
           );
   ```
   **Json**
   ```
   {
     "gzipped": true, 
     "headers": {
       "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", 
       "Accept-Encoding": "gzip, deflate", 
       "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", 
       "Host": "httpbin.org", 
       "Upgrade-Insecure-Requests": "1", 
       "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36", 
       "X-Amzn-Trace-Id": "Root=1-627485d8-7e392fe6768038d40483a571"
     }, 
     "method": "GET", 
     "origin": "101.93.240.152"
   }
   ```
   
   **error**
   ```
    The program finished with the following exception:
   
   org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Execute Flink task error
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366)
           at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
           at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
           at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
           at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
           at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
           at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
           at java.security.AccessController.doPrivileged(Native Method)
           at javax.security.auth.Subject.doAs(Subject.java:422)
           at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
           at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
           at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
   Caused by: java.lang.RuntimeException: Execute Flink task error
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:71)
           at org.apache.seatunnel.core.base.Seatunnel.run(Seatunnel.java:39)
           at org.apache.seatunnel.core.flink.SeatunnelFlink.main(SeatunnelFlink.java:33)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
           ... 11 more
   Caused by: java.lang.Exception: Flink batch transform sql execute failed, SQL: select headers from rawmsg
           at org.apache.seatunnel.flink.transform.Sql.processBatch(Sql.java:63)
           at org.apache.seatunnel.flink.batch.FlinkBatchExecution.start(FlinkBatchExecution.java:64)
           at org.apache.seatunnel.core.flink.command.FlinkTaskExecuteCommand.execute(FlinkTaskExecuteCommand.java:68)
           ... 18 more
   Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 14: Column 'headers' not found in any table
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:149)
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109)
           at org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:152)
           at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:67)
           at org.apache.flink.table.api.internal.TableEnvImpl.sqlQuery(TableEnvImpl.scala:528)
           at org.apache.seatunnel.flink.transform.Sql.processBatch(Sql.java:61)
           ... 20 more
   Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 14: Column 'headers' not found in any table
           at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
           at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
           at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
           at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
           at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043)
           at org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:259)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6015)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6178)
           at org.apache.calcite.sql.validate.SqlValidatorImpl$SelectExpander.visit(SqlValidatorImpl.java:6164)
           at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:320)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectExpr(SqlValidatorImpl.java:5600)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:411)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474)
           at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
           at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
           at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
           at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
           at org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:144)
           ... 25 more
   Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'headers' not found in any table
           at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
           at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
           at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
           at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
           at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
           at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
           ... 45 more
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r866523846


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);

Review Comment:
   Is it better to return a concrete object than to return null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r866496482


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/util/HttpClientUtils.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source.util;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class HttpClientUtils {
+
+    private HttpClientUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    private static final String ENCODING = "UTF-8";
+    private static final int CONNECT_TIMEOUT = 6000 * 2;
+    private static final int SOCKET_TIMEOUT = 6000 * 10;
+    private static final int INITIAL_CAPACITY = 16;
+
+    /**
+     * Send a get request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url) throws Exception {
+        return doGet(url, null, null);
+    }
+
+    /**
+     * Send a get request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> params) throws Exception {
+        return doGet(url, null, params);
+    }
+
+    /**
+     * Send a get request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doGet(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        // Create access address
+        URIBuilder uriBuilder = new URIBuilder(url);
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                uriBuilder.setParameter(entry.getKey(), entry.getValue());
+            }
+        }
+
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        HttpGet httpGet = new HttpGet(uriBuilder.build());
+        httpGet.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
+            // set request header
+            packageHeader(headers, httpGet);
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpGet);
+        }
+    }
+
+    /**
+     * Send a post request without request headers and request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url) throws Exception {
+        return doPost(url, null, null);
+    }
+
+    /**
+     * Send post request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> params) throws Exception {
+        return doPost(url, null, params);
+    }
+
+    /**
+     * Send a post request with request headers and request parameters
+     *
+     * @param url     request address
+     * @param headers request header map
+     * @param params  request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPost(String url, Map<String, String> headers, Map<String, String> params) throws Exception {
+        HttpPost httpPost = new HttpPost(url);
+        /**
+         * setConnectTimeout:Set the connection timeout, in milliseconds.
+         * setSocketTimeout:The timeout period (ie response time) for requesting data acquisition, in milliseconds.
+         * If an interface is accessed, and the data cannot be returned within a certain amount of time, the call is simply abandoned.
+         */
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPost.setConfig(requestConfig);
+        // set request header
+        packageHeader(headers, httpPost);
+
+        // Encapsulate request parameters
+        packageParam(params, httpPost);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            // Execute the request and get the response result
+            return getHttpClientResult(httpClient, httpPost);
+        }
+    }
+
+    /**
+     * Send a put request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url) throws Exception {
+        return doPut(url, null);
+    }
+
+    /**
+     * Send a put request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doPut(String url, Map<String, String> params) throws Exception {
+
+        HttpPut httpPut = new HttpPut(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpPut.setConfig(requestConfig);
+
+        packageParam(params, httpPut);
+
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpPut);
+        }
+    }
+
+    /**
+     * Send delete request without request parameters
+     *
+     * @param url request address
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url) throws Exception {
+
+        HttpDelete httpDelete = new HttpDelete(url);
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(CONNECT_TIMEOUT).setSocketTimeout(SOCKET_TIMEOUT).build();
+        httpDelete.setConfig(requestConfig);
+        try (CloseableHttpClient httpClient = HttpClients.createDefault();) {
+            return getHttpClientResult(httpClient, httpDelete);
+        }
+    }
+
+    /**
+     * Send delete request with request parameters
+     *
+     * @param url    request address
+     * @param params request parameter map
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult doDelete(String url, Map<String, String> params) throws Exception {
+        if (params == null) {
+            params = new HashMap<>(INITIAL_CAPACITY);
+        }
+
+        params.put("_method", "delete");
+        return doPost(url, params);
+    }
+
+    /**
+     * encapsulate request header
+     *
+     * @param params     request header map
+     * @param httpMethod http request method
+     */
+    public static void packageHeader(Map<String, String> params, HttpRequestBase httpMethod) {
+        // encapsulate request header
+        if (params != null) {
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                // Set to the request header to the HttpRequestBase object
+                httpMethod.setHeader(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    /**
+     * Encapsulate request parameters
+     *
+     * @param params     request parameter map
+     * @param httpMethod http request method
+     * @throws UnsupportedEncodingException exception information
+     */
+    public static void packageParam(Map<String, String> params, HttpEntityEnclosingRequestBase httpMethod) throws UnsupportedEncodingException {
+        // Encapsulate request parameters
+        if (params != null) {
+            List<NameValuePair> nvps = new ArrayList<>();
+            Set<Entry<String, String>> entrySet = params.entrySet();
+            for (Entry<String, String> entry : entrySet) {
+                nvps.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
+            }
+
+            // Set to the request's http object
+            httpMethod.setEntity(new UrlEncodedFormEntity(nvps, ENCODING));
+        }
+    }
+
+    /**
+     * get response result
+     *
+     * @param httpClient http client object
+     * @param httpMethod http method onject
+     * @return http response result
+     * @throws Exception information
+     */
+    public static HttpClientResult getHttpClientResult(CloseableHttpClient httpClient, HttpRequestBase httpMethod) throws Exception {
+        // execute request
+        try (CloseableHttpResponse httpResponse = httpClient.execute(httpMethod)) {
+            // get return result
+            if (httpResponse != null && httpResponse.getStatusLine() != null) {
+                String content = "";
+                if (httpResponse.getEntity() != null) {
+                    content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
+                }
+                return new HttpClientResult(httpResponse.getStatusLine().getStatusCode(), content);
+            }
+        }
+        return new HttpClientResult(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+    }
+
+    /**
+     * release resources
+     *
+     * @param httpResponse http response object
+     * @param httpClient   http client objet
+     * @throws IOException information
+     */
+    public static void release(CloseableHttpResponse httpResponse, CloseableHttpClient httpClient) throws IOException {
+        // release resources
+        if (httpResponse != null) {
+            httpResponse.close();
+        }
+        if (httpClient != null) {
+            httpClient.close();
+        }
+    }

Review Comment:
   The methods of this tool class are reserved for future extension to users. Let me add a comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865760360


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   I am new to flink, but I see that the method getBatchTableEnvironment has been marked as deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] tmljob commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
tmljob commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865756866


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);

Review Comment:
   Print the value of the synchronization parameter for easy location and troubleshooting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1798: [Feature][Connector]support flink-connecor-http

Posted by GitBox <gi...@apache.org>.
ruanwenjun commented on code in PR #1798:
URL: https://github.com/apache/incubator-seatunnel/pull/1798#discussion_r865795372


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-http/src/main/java/org/apache/seatunnel/flink/http/source/Http.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.flink.http.source;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.flink.batch.FlinkBatchSource;
+import org.apache.seatunnel.flink.http.source.constant.Settings;
+import org.apache.seatunnel.flink.http.source.util.HttpClientResult;
+import org.apache.seatunnel.flink.http.source.util.HttpClientUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Http implements FlinkBatchSource {
+
+    private static final String GET = "GET";
+    private static final String POST = "POST";
+    private static final int INITIAL_CAPACITY = 16;
+
+    private static Logger LOG = LoggerFactory.getLogger(Http.class);
+
+    private Config config;
+
+    @Override
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return CheckConfigUtil.checkAllExists(config, Settings.SOURCE_HTTP_URL);
+    }
+
+    @Override
+    public DataSet<Row> getData(FlinkEnvironment env) {
+        String url = config.getString(Settings.SOURCE_HTTP_URL);
+        String method = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_METHOD, GET);
+        String header = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_HEADER, "");
+        String requestParams = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_REQUEST_PARAMS, "");
+        String syncPath = TypesafeConfigUtils.getConfig(config, Settings.SOURCE_HTTP_SYNC_PATH, "");
+
+        Map requestMap = jsonToMap(requestParams);
+        String syncValues = getSyncValues(env.getBatchEnvironment(), syncPath);
+        LOG.info("sync values->{}", syncValues);
+        Map syncMap = jsonToMap(syncValues);
+        if (!syncMap.isEmpty()) {
+            requestMap.putAll(syncMap);
+        }
+
+        HttpClientResult response = new HttpClientResult();
+        try {
+            Map headerMap = jsonToMap(header);
+            if (POST.equals(method)) {
+                response = HttpClientUtils.doPost(url, headerMap, requestMap);
+            } else {
+                response = HttpClientUtils.doGet(url, headerMap, requestMap);
+            }
+        } catch (Exception e) {
+            LOG.error("http call error!", e);
+        }
+
+        LOG.info("http respond code->{}", response.getCode());
+        LOG.info("http respond body->{}", response.getContent());
+
+        Row row = Row.of(response.getContent());

Review Comment:
   I suggest to use `env.getBatchTableEnvironment().toDataSet() ` to define the filed name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org