You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/02/26 02:30:43 UTC
[GitHub] [incubator-seatunnel] tmljob edited a comment on issue #1270: Whether to support restful api (http) as input source?
tmljob edited a comment on issue #1270:
URL: https://github.com/apache/incubator-seatunnel/issues/1270#issuecomment-1051477335
I tried to develop an http input plug-in based on version 1.X, the function is relatively simple, and I verified that it can run. On this basis, the function can be further improved. The code example is as follows:
```scala
import io.github.interestinglab.waterdrop.apis.BaseStaticInput
import io.github.interestinglab.waterdrop.config.{Config, ConfigFactory}
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.util.EntityUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import java.io.IOException
class HttpStatic extends BaseStaticInput {
var config: Config = ConfigFactory.empty()
override def getDataset(spark: SparkSession): Dataset[Row] = {
var path = config.getString("path")
val response = getRequest(path)
val reader = spark.read.format("json")
var arr = Array(response)
var rdd = spark.sparkContext.parallelize(arr)
reader.json(rdd)
}
/**
* Set Config
* @param config
*/
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* Get Config
* @return
*/
override def getConfig(): Config = {
this.config
}
override def checkConfig(): (Boolean, String) = {
config.hasPath("path") match {
case true => (true,"")
case false => (false,"please specify [path] as non-empty string")
}
}
def getRequest(url: String): String = {
val httpClient = HttpClientBuilder.create().build()
val httpGet = new HttpGet(url)
var response: CloseableHttpResponse = null
var request = ""
try {
response = httpClient.execute(httpGet)
val entity = response.getEntity
// EntityUtils.
request = EntityUtils.toString(entity)
} catch {
case ex: Exception => {
ex.printStackTrace()
}
} finally {
try {
if (httpClient != null) httpClient.close()
if (response != null) response.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
request
}
def postRequest(url: String): String = {
val httpClient = HttpClientBuilder.create().build()
val httpPost = new HttpPost(url)
httpPost.setHeader("Content-Type", "application/json;charset=utf8")
var response: CloseableHttpResponse = null
var request: String = ""
try {
response = httpClient.execute(httpPost)
val entity = response.getEntity
request = EntityUtils.toString(entity)
} catch {
case ex: Exception => {
ex.printStackTrace()
}
} finally {
try {
if (httpClient != null) httpClient.close()
if (response != null) response.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
request
}
def postRequest(url: String,bodyData: String): String = {
val httpClient = HttpClientBuilder.create().build()
val httpPost = new HttpPost(url)
httpPost.setHeader("Content-Type", "application/json;charset=utf-8")
//设置超时时间
httpPost.setConfig(RequestConfig
.custom()
.setConnectTimeout(5000)
.setConnectionRequestTimeout(1000)
.setSocketTimeout(5000)
.build())
val entity = new StringEntity(bodyData,"utf-8")
entity.setChunked(false)
httpPost.setEntity(entity)
var response: CloseableHttpResponse = null
var request: String = ""
try {
response = httpClient.execute(httpPost)
request = EntityUtils.toString(response.getEntity,"UTF-8")
} finally {
try {
httpClient.close()
response.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
request
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org