You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/02/17 03:22:21 UTC

[GitHub] [incubator-seatunnel] tmljob opened a new issue #1270: Whether to support restful api (http) as input source?

tmljob opened a new issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270


   ### Search before asking
   
   - [X] I had searched in the [feature](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement.
   
   
   ### Description
   
   Whether to support restful api (http) as input source
   
   ### Usage Scenario
   
   Our data collection scenarios need to call third-party interfaces or accept pushes from third-party interfaces.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] tmljob edited a comment on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
tmljob edited a comment on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1051477335


   @kalencaya
   I tried to develop an http input plug-in based on version 1.X, the function is relatively simple, and I verified that it can run. On this basis, the function can be further improved. The code example is as follows:
   ```scala
   import io.github.interestinglab.waterdrop.apis.BaseStaticInput
   import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
   import org.apache.http.client.config.RequestConfig
   import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
   import org.apache.http.entity.StringEntity
   import org.apache.http.impl.client.HttpClientBuilder
   import org.apache.http.util.EntityUtils
   import org.apache.spark.sql.{Dataset, Row, SparkSession}
   
   import java.io.IOException
   
   class HttpStatic extends BaseStaticInput {
     var config: Config = ConfigFactory.empty()
   
     override def getDataset(spark: SparkSession): Dataset[Row] = {
       var path = config.getString("path")
       val response = getRequest(path)
       val reader = spark.read.format("json")
       var arr = Array(response)
       var rdd = spark.sparkContext.parallelize(arr)
       reader.json(rdd)
     }
   
     /**
      * Set Config
      * @param config
      */
     override def setConfig(config: Config): Unit = {
       this.config = config
     }
   
     /**
      * Get Config
      * @return
      */
     override def getConfig(): Config = {
       this.config
     }
   
     override def checkConfig(): (Boolean, String) = {
       config.hasPath("path") match {
         case true => (true,"")
         case  false => (false,"please specify [path] as non-empty string")
         }
       }
   
     def getRequest(url: String): String = {
   
       val httpClient = HttpClientBuilder.create().build()
       val httpGet = new HttpGet(url)
       var response: CloseableHttpResponse = null
       var request = ""
       try {
         response = httpClient.execute(httpGet)
         val entity = response.getEntity
         //      EntityUtils.
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf8")
   
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         val entity = response.getEntity
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String,bodyData: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
       //设置超时时间
       httpPost.setConfig(RequestConfig
         .custom()
         .setConnectTimeout(5000)
         .setConnectionRequestTimeout(1000)
         .setSocketTimeout(5000)
         .build())
   
       val entity = new StringEntity(bodyData,"utf-8")
       entity.setChunked(false)
       httpPost.setEntity(entity)
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         request = EntityUtils.toString(response.getEntity,"UTF-8")
       } finally {
         try {
           httpClient.close()
           response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
   }
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ashulin commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
ashulin commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1086514626


   For generality, I feel the following points need to be considered:
   1. How to get HTTP API permissions, SSL or Token?
   2. How to get new data? Need to update timestamps or flip pages, or nothing at all.
   3. How to process data (Json)? Similar to Kafka Json deserialization, the format of each user may be inconsistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] tmljob commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
tmljob commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1054021091


   > thanks for your sharing.
   > 
   > My data sync works are always limited by some http interfaces ratelimiter, especially for incremental sync. A classical http interface which supports incremental sync contains below request params or variant:
   > 
   > ```
   > {
   >     "startTime": "yyyy-MM-dd HH:mm:ss",
   >     "endTime": "yyyy-MM-dd HH:mm:ss",
   >     "pageIndex": 1,
   >     "pageSize": 50
   > }
   > ```
   > 
   > For ratelimiter and request params limitions, I have to controll concurrency and handle annoying network failure to get accurate and quick sync result. So, my core work is split sync task, then execute concurrently, update sync offset(latest synced time interval), I also belives flink source framework can do that process better.
   > 
   > Http authorization requires every request provides credentials, and I have more than 2500 credential account. Unbalanced hot data can appear in any credential account and any time, I had started a sync job for every credential account and every http interface, which just for incremental scenarios. How to handle numerous http interface credentials now really blocks me.
   > 
   > Now, I'm taking advantage of akka excellent concurrency and xxl-job to refactoring sync framework, you can contact me by wechat which is also kalencaya.
   
   I can't find you through this WeChat account, please see if you can add me through this account: **tmljob123**, I will pull up a WeChat group and discuss with another brother.
   @kalencaya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] tmljob commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
tmljob commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1051477335


   I tried to develop an http input plug-in based on version 1.X, the function is relatively simple, and I verified that it can run. On this basis, the function can be further improved. The code example is as follows:
   ```
   import io.github.interestinglab.waterdrop.apis.BaseStaticInput
   import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
   import org.apache.http.client.config.RequestConfig
   import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
   import org.apache.http.entity.StringEntity
   import org.apache.http.impl.client.HttpClientBuilder
   import org.apache.http.util.EntityUtils
   import org.apache.spark.sql.{Dataset, Row, SparkSession}
   
   import java.io.IOException
   
   class HttpStatic extends BaseStaticInput {
     var config: Config = ConfigFactory.empty()
   
     override def getDataset(spark: SparkSession): Dataset[Row] = {
       var path = config.getString("path")
       val response = getRequest(path)
       val reader = spark.read.format("json")
       var arr = Array(response)
       var rdd = spark.sparkContext.parallelize(arr)
       reader.json(rdd)
     }
   
     /**
      * Set Config
      * @param config
      */
     override def setConfig(config: Config): Unit = {
       this.config = config
     }
   
     /**
      * Get Config
      * @return
      */
     override def getConfig(): Config = {
       this.config
     }
   
     override def checkConfig(): (Boolean, String) = {
       config.hasPath("path") match {
         case true => (true,"")
         case  false => (false,"please specify [path] as non-empty string")
         }
       }
   
     def getRequest(url: String): String = {
   
       val httpClient = HttpClientBuilder.create().build()
       val httpGet = new HttpGet(url)
       var response: CloseableHttpResponse = null
       var request = ""
       try {
         response = httpClient.execute(httpGet)
         val entity = response.getEntity
         //      EntityUtils.
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf8")
   
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         val entity = response.getEntity
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String,bodyData: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
       //设置超时时间
       httpPost.setConfig(RequestConfig
         .custom()
         .setConnectTimeout(5000)
         .setConnectionRequestTimeout(1000)
         .setSocketTimeout(5000)
         .build())
   
       val entity = new StringEntity(bodyData,"utf-8")
       entity.setChunked(false)
       httpPost.setEntity(entity)
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         request = EntityUtils.toString(response.getEntity,"UTF-8")
       } finally {
         try {
           httpClient.close()
           response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] CalvinKirs commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1063596973


   > My opinion is that we can implement a spark-connector-http based on spark datasource api, then to integrate it.
   
   @tmljob @zhaomin1423 Contributions welcome


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] kalencaya commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
kalencaya commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1049413986


   good idea.
   I have faced the similiar confusion just like you, for my main and core works are part of collecting data from third-party openplatform http interfaces.
   
   But interfaces ratelimiter and authorization prevents me from syncing data by flink, have to do it by other way.
   
   I'm glad and willing to comminucate more with you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] zhaomin1423 commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
zhaomin1423 commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1054039884


   My opinion is that we can implement a spark-connector-http based on spark datasource api, then to integrate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] zhaomin1423 commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
zhaomin1423 commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1063603534


   > > My opinion is that we can implement a spark-connector-http based on spark datasource api, then to integrate it.
   > 
   > @tmljob @zhaomin1423 Contributions welcome
   
   I will implement a spark-connector-http recently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] tmljob edited a comment on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
tmljob edited a comment on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1051477335


   I tried to develop an http input plug-in based on version 1.X, the function is relatively simple, and I verified that it can run. On this basis, the function can be further improved. The code example is as follows:
   ```scala
   import io.github.interestinglab.waterdrop.apis.BaseStaticInput
   import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
   import org.apache.http.client.config.RequestConfig
   import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
   import org.apache.http.entity.StringEntity
   import org.apache.http.impl.client.HttpClientBuilder
   import org.apache.http.util.EntityUtils
   import org.apache.spark.sql.{Dataset, Row, SparkSession}
   
   import java.io.IOException
   
   class HttpStatic extends BaseStaticInput {
     var config: Config = ConfigFactory.empty()
   
     override def getDataset(spark: SparkSession): Dataset[Row] = {
       var path = config.getString("path")
       val response = getRequest(path)
       val reader = spark.read.format("json")
       var arr = Array(response)
       var rdd = spark.sparkContext.parallelize(arr)
       reader.json(rdd)
     }
   
     /**
      * Set Config
      * @param config
      */
     override def setConfig(config: Config): Unit = {
       this.config = config
     }
   
     /**
      * Get Config
      * @return
      */
     override def getConfig(): Config = {
       this.config
     }
   
     override def checkConfig(): (Boolean, String) = {
       config.hasPath("path") match {
         case true => (true,"")
         case  false => (false,"please specify [path] as non-empty string")
         }
       }
   
     def getRequest(url: String): String = {
   
       val httpClient = HttpClientBuilder.create().build()
       val httpGet = new HttpGet(url)
       var response: CloseableHttpResponse = null
       var request = ""
       try {
         response = httpClient.execute(httpGet)
         val entity = response.getEntity
         //      EntityUtils.
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf8")
   
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         val entity = response.getEntity
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String,bodyData: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
       //设置超时时间
       httpPost.setConfig(RequestConfig
         .custom()
         .setConnectTimeout(5000)
         .setConnectionRequestTimeout(1000)
         .setSocketTimeout(5000)
         .build())
   
       val entity = new StringEntity(bodyData,"utf-8")
       entity.setChunked(false)
       httpPost.setEntity(entity)
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         request = EntityUtils.toString(response.getEntity,"UTF-8")
       } finally {
         try {
           httpClient.close()
           response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] kalencaya commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
kalencaya commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1053830971


   thanks for your sharing.
   
   My data sync works are always limited by some http interfaces ratelimiter, especially for incremental sync.
   A classical http interface which supports incremental sync contains below request params or variant:
   ```
   {
       "startTime": "yyyy-MM-dd HH:mm:ss",
       "endTime": "yyyy-MM-dd HH:mm:ss",
       "pageIndex": 1,
       "pageSize": 50
   }
   ```
   For ratelimiter and request params limitions, I have to controll concurrency and handle annoying network failure to get accurate and quick sync result. So, my core work is split sync task, then execute concurrently, update sync offset(latest synced time interval), I also belives flink source framework can do that process better.
   
   Http authorization requires every request provides credentials, and I have more than 2500 credential account. Unbalanced hot data can appear in any credential account and any time, I had started a sync job for every credential account and every http interface, which just for incremental scenarios. How to handle numerous http interface credentials now really blocks me.
   
   Now, I'm taking advantage of akka excellent concurrency and xxl-job to refactoring sync framework, you can contact me by wechat which is also kalencaya.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] ashulin edited a comment on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
ashulin edited a comment on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1086514626


   For generality, I think the following points need to be considered:
   1. How to get HTTP API permissions, SSL or Token?
   2. How to get new data? Need to update timestamps or flip pages, or nothing at all.
   3. How to process data (Json)? Similar to Kafka Json deserialization, the format of each user may be inconsistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] zhaomin1423 commented on issue #1270: Whether to support restful api (http) as input source?

Posted by GitBox <gi...@apache.org>.
zhaomin1423 commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1053916304


   > thanks for your sharing.
   > 
   > My data sync works are always limited by some http interfaces ratelimiter, especially for incremental sync. A classical http interface which supports incremental sync contains below request params or variant:
   > 
   > ```
   > {
   >     "startTime": "yyyy-MM-dd HH:mm:ss",
   >     "endTime": "yyyy-MM-dd HH:mm:ss",
   >     "pageIndex": 1,
   >     "pageSize": 50
   > }
   > ```
   > 
   > For ratelimiter and request params limitions, I have to controll concurrency and handle annoying network failure to get accurate and quick sync result. So, my core work is split sync task, then execute concurrently, update sync offset(latest synced time interval), I also belives flink source framework can do that process better.
   > 
   > Http authorization requires every request provides credentials, and I have more than 2500 credential account. Unbalanced hot data can appear in any credential account and any time, I had started a sync job for every credential account and every http interface, which just for incremental scenarios. How to handle numerous http interface credentials now really blocks me.
   > 
   > Now, I'm taking advantage of akka excellent concurrency and xxl-job to refactoring sync framework, you can contact me by wechat which is also kalencaya.
   
   I am willing to work with you. My wechat is 602128569.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org