You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/02/26 02:27:50 UTC

[GitHub] [incubator-seatunnel] tmljob commented on issue #1270: Whether to support restful api (http) as input source?

tmljob commented on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1051477335


   I tried to develop an http input plug-in based on version 1.X, the function is relatively simple, and I verified that it can run. On this basis, the function can be further improved. The code example is as follows:
   ```
   import io.github.interestinglab.waterdrop.apis.BaseStaticInput
   import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
   import org.apache.http.client.config.RequestConfig
   import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
   import org.apache.http.entity.StringEntity
   import org.apache.http.impl.client.HttpClientBuilder
   import org.apache.http.util.EntityUtils
   import org.apache.spark.sql.{Dataset, Row, SparkSession}
   
   import java.io.IOException
   
   class HttpStatic extends BaseStaticInput {
     var config: Config = ConfigFactory.empty()
   
     override def getDataset(spark: SparkSession): Dataset[Row] = {
       var path = config.getString("path")
       val response = getRequest(path)
       val reader = spark.read.format("json")
       var arr = Array(response)
       var rdd = spark.sparkContext.parallelize(arr)
       reader.json(rdd)
     }
   
     /**
      * Set Config
      * @param config
      */
     override def setConfig(config: Config): Unit = {
       this.config = config
     }
   
     /**
      * Get Config
      * @return
      */
     override def getConfig(): Config = {
       this.config
     }
   
     override def checkConfig(): (Boolean, String) = {
       config.hasPath("path") match {
         case true => (true,"")
         case  false => (false,"please specify [path] as non-empty string")
         }
       }
   
     def getRequest(url: String): String = {
   
       val httpClient = HttpClientBuilder.create().build()
       val httpGet = new HttpGet(url)
       var response: CloseableHttpResponse = null
       var request = ""
       try {
         response = httpClient.execute(httpGet)
         val entity = response.getEntity
         //      EntityUtils.
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf8")
   
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         val entity = response.getEntity
         request = EntityUtils.toString(entity)
       } catch {
         case ex: Exception => {
           ex.printStackTrace()
         }
       } finally {
         try {
           if (httpClient != null) httpClient.close()
           if (response != null) response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
     def postRequest(url: String,bodyData: String): String = {
       val httpClient = HttpClientBuilder.create().build()
       val httpPost = new HttpPost(url)
       httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
       //设置超时时间
       httpPost.setConfig(RequestConfig
         .custom()
         .setConnectTimeout(5000)
         .setConnectionRequestTimeout(1000)
         .setSocketTimeout(5000)
         .build())
   
       val entity = new StringEntity(bodyData,"utf-8")
       entity.setChunked(false)
       httpPost.setEntity(entity)
       var response: CloseableHttpResponse = null
       var request: String = ""
   
       try {
         response = httpClient.execute(httpPost)
         request = EntityUtils.toString(response.getEntity,"UTF-8")
       } finally {
         try {
           httpClient.close()
           response.close()
         } catch {
           case e: IOException =>
             e.printStackTrace()
         }
       }
       request
     }
   
   }
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org