You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/28 04:55:53 UTC

[incubator-seatunnel] branch dev updated: [Bug][Connector]fix runtime errors of webhook connector (#1758)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new be92f6f1 [Bug][Connector]fix runtime errors of webhook connector (#1758)
be92f6f1 is described below

commit be92f6f1624dc101b8a81ec56c0282bef3b93437
Author: tmljob <69...@users.noreply.github.com>
AuthorDate: Thu Apr 28 12:55:49 2022 +0800

    [Bug][Connector]fix runtime errors of webhook connector (#1758)
    
    * improve webhook connector
    * use long instead of TimeStamp
---
 docs/en/connector/source/Webhook.md                        |  6 +++---
 .../services}/org.apache.seatunnel.spark.BaseSparkSource   |  2 +-
 .../apache/seatunnel/spark/webhook/source/HttpData.scala   |  4 +---
 .../seatunnel/spark/webhook/source/HttpPushServlet.scala   |  9 ++++-----
 .../source/{JettyServerStream.scala => JettyServer.scala}  |  2 +-
 .../webhook/source/{Webhook.scala => WebhookStream.scala}  | 14 ++++++++++++--
 6 files changed, 22 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector/source/Webhook.md b/docs/en/connector/source/Webhook.md
index d669ad52..d8998ecc 100644
--- a/docs/en/connector/source/Webhook.md
+++ b/docs/en/connector/source/Webhook.md
@@ -1,4 +1,4 @@
-# Webhook
+# WebhookStream
 
 ## Description
 
@@ -8,7 +8,7 @@ Provide http interface to push data,only supports post requests.
 
 Engine Supported and plugin name
 
-* [x] Spark: Webhook
+* [x] Spark: WebhookStream
 * [ ] Flink
 
 :::
@@ -35,7 +35,7 @@ Source plugin common parameters, please refer to [Source Plugin](common-options.
 ## Example
 
 ```
-Webhook {
+WebhookStream {
       result_table_name = "request_body"
    }
 ```
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
similarity index 93%
rename from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
rename to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
index d66063ec..0f4f54ec 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF.services/org.apache.seatunnel.spark.BaseSparkSource
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkSource
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.seatunnel.spark.webhook.source.Webhook
+org.apache.seatunnel.spark.webhook.source.WebhookStream
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
index e3a1c9db..310001a6 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpData.scala
@@ -17,12 +17,10 @@
 
 package org.apache.seatunnel.spark.webhook.source
 
-import java.util.Date
-
 /**
  * Streaming data read from local server will have this schema
  *
  * @param value - The payload POSTed to http endpoint.
  * @param timestamp - Timestamp of when it was put on a stream.
  */
-case class HttpData(value: String, timestamp: Date)
+case class HttpData(value: String, timestamp: Long)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
index 09205047..e1c6c5bc 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/HttpPushServlet.scala
@@ -17,18 +17,17 @@
 
 package org.apache.seatunnel.spark.webhook.source
 
-import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
-import scala.io.Source
 import org.apache.spark.sql.execution.streaming.MemoryStream
 
-import java.util.Date
+import java.sql.Timestamp
+import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
+import scala.io.Source
 
 class HttpPushServlet(stream: MemoryStream[HttpData]) extends HttpServlet {
 
   override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
     val resBody = Source.fromInputStream(req.getInputStream).mkString
-    val timestamp = new Date(System.currentTimeMillis())
-    stream.addData(HttpData(resBody, timestamp))
+    stream.addData(HttpData(resBody, System.currentTimeMillis()))
 
     resp.setContentType("application/json;charset=utf-8")
     resp.setStatus(HttpServletResponse.SC_OK)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServer.scala
similarity index 96%
rename from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
rename to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServer.scala
index 78c59eed..529e9a62 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServerStream.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/JettyServer.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.spark_project.jetty.server.Server
 import org.spark_project.jetty.servlet.{ServletContextHandler, ServletHolder}
 
-class JettyServerStream(port: Int = 9999, baseUrl: String = "/") {
+class JettyServer(port: Int = 9999, baseUrl: String = "/") {
 
   // Create server
   var server: Server = new Server(port)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/WebhookStream.scala
similarity index 86%
rename from seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
rename to seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/WebhookStream.scala
index eaa5e0ae..147e51a5 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/Webhook.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-webhook/src/main/scala/org/apache/seatunnel/spark/webhook/source/WebhookStream.scala
@@ -23,7 +23,10 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Row, SparkSession, SQLContext}
 import org.apache.spark.streaming.dstream.DStream
 
-class Webhook extends SparkStreamingSource[String] {
+class WebhookStream extends SparkStreamingSource[String] {
+
+  // Create server
+  var server: JettyServer = null
 
   override def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): Unit = {
     var spark = env.getSparkSession
@@ -33,7 +36,8 @@ class Webhook extends SparkStreamingSource[String] {
     var port = if (config.hasPath("port")) config.getInt("port") else 9999
     var baseUrl = if (config.hasPath("path")) config.getString("path") else "/"
 
-    val query = new JettyServerStream(port, baseUrl)
+    this.server = new JettyServer(port, baseUrl)
+    val query = this.server
       .toDF
       .writeStream
       .foreachBatch((batch, batchId) => {
@@ -44,6 +48,12 @@ class Webhook extends SparkStreamingSource[String] {
     query.awaitTermination()
   }
 
+  def stop(): Unit = {
+    if (this.server != null) {
+      this.server.stop()
+    }
+  }
+
   override def rdd2dataset(sparkSession: SparkSession, rdd: RDD[String]): Dataset[Row] = { null }
 
   override def getData(env: SparkEnvironment): DStream[String] = { null }