You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/28 04:55:53 UTC
[incubator-seatunnel] branch dev updated: [Bug][Connector]fix runtime errors of webhook connector (#1758)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new be92f6f1 [Bug][Connector]fix runtime errors of webhook connector (#1758)
be92f6f1 is described below
commit be92f6f1624dc101b8a81ec56c0282bef3b93437
Author: tmljob <69...@users.noreply.github.com>
AuthorDate: Thu Apr 28 12:55:49 2022 +0800
[Bug][Connector]fix runtime errors of webhook connector (#1758)
* improve webhook connector
* use long instead of TimeStamp
---
docs/en/connector/source/Webhook.md | 6 +++---
.../services}/org.apache.seatunnel.spark.BaseSparkSource | 2 +-
.../apache/seatunnel/spark/webhook/source/HttpData.scala | 4 +---
.../seatunnel/spark/webhook/source/HttpPushServlet.scala | 9 ++++-----
.../source/{JettyServerStream.scala => JettyServer.scala} | 2 +-
.../webhook/source/{Webhook.scala => WebhookStream.scala} | 14 ++++++++++++--
6 files changed, 22 insertions(+), 15 deletions(-)
diff --git a/docs/en/connector/source/Webhook.md b/docs/en/connector/source/Webhook.md
index d669ad52..d8998ecc 100644
--- a/docs/en/connector/source/Webhook.md
+++ b/docs/en/connector/source/Webhook.md
@@ -1,4 +1,4 @@
-# Webhook
+# WebhookStream
## Description
@@ -8,7 +8,7 @@ Provide http interface to push data,only supports post requests.
Engine Supported and plugin name
-* [x] Spark: Webhook
+* [x] Spark: WebhookStream
* [ ] Flink
:::
@@ -35,7 +35,7 @@ Source plugin common parameters, please refer to [Source Plugin](common-options.
## Example
```
-Webhook {
+WebhookStream {
result_table_name = "request_body"
}
```
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
similarity index 93%
rename from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
rename to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
index d66063ec..0f4f54ec 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.seatunnel.spark.webhook.source.Webhook
+org.apache.seatunnel.spark.webhook.source.WebhookStream
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
index e3a1c9db..310001a6 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
@@ -17,12 +17,10 @@
package org.apache.seatunnel.spark.webhook.source
-import java.util.Date
-
/**
* Streaming data read from local server will have this schema
*
* @param value - The payload POSTed to http endpoint.
* @param timestamp - Timestamp of when it was put on a stream.
*/
-case class HttpData(value: String, timestamp: Date)
+case class HttpData(value: String, timestamp: Long)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
index 09205047..e1c6c5bc 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
@@ -17,18 +17,17 @@
package org.apache.seatunnel.spark.webhook.source
-import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import scala.io.Source
import org.apache.spark.sql.execution.streaming.MemoryStream
-import java.util.Date
+import java.sql.Timestamp
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+import scala.io.Source
class HttpPushServlet(stream: MemoryStream[HttpData]) extends HttpServlet {
override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
val resBody = Source.fromInputStream(req.getInputStream).mkString
- val timestamp = new Date(System.currentTimeMillis())
- stream.addData(HttpData(resBody, timestamp))
+ stream.addData(HttpData(resBody, System.currentTimeMillis()))
resp.setContentType("application/json;charset=utf-8")
resp.setStatus(HttpServletResponse.SC_OK)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServer.scala
similarity index 96%
rename from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
rename to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServer.scala
index 78c59eed..529e9a62 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServer.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.streaming.MemoryStream
import org.spark_project.jetty.server.Server
import org.spark_project.jetty.servlet.{ServletContextHandler, ServletHolder}
-class JettyServerStream(port: Int = 9999, baseUrl: String = "/") {
+class JettyServer(port: Int = 9999, baseUrl: String = "/") {
// Create server
var server: Server = new Server(port)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/WebhookStream.scala
similarity index 86%
rename from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
rename to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/WebhookStream.scala
index eaa5e0ae..147e51a5 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/WebhookStream.scala
@@ -23,7 +23,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.streaming.dstream.DStream
-class Webhook extends SparkStreamingSource[String] {
+class WebhookStream extends SparkStreamingSource[String] {
+
+ // Create server
+ var server: JettyServer = null
override def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): Unit = {
var spark = env.getSparkSession
@@ -33,7 +36,8 @@ class Webhook extends SparkStreamingSource[String] {
var port = if (config.hasPath("port")) config.getInt("port") else 9999
var baseUrl = if (config.hasPath("path")) config.getString("path") else "/"
- val query = new JettyServerStream(port, baseUrl)
+ this.server = new JettyServer(port, baseUrl)
+ val query = this.server
.toDF
.writeStream
.foreachBatch((batch, batchId) => {
@@ -44,6 +48,12 @@ class Webhook extends SparkStreamingSource[String] {
query.awaitTermination()
}
+ def stop(): Unit = {
+ if (this.server != null) {
+ this.server.stop()
+ }
+ }
+
override def rdd2dataset(sparkSession: SparkSession, rdd: RDD[String]): Dataset[Row] = { null }
override def getData(env: SparkEnvironment): DStream[String] = { null }