You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/07/02 12:56:26 UTC

[griffin] branch master updated: [GRIFFIN-247] Support pluggable sinks

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 42bd3e1  [GRIFFIN-247] Support pluggable sinks
42bd3e1 is described below

commit 42bd3e111db4b41e5ac23a14021d22973801c7db
Author: iyuriysoft <42...@users.noreply.github.com>
AuthorDate: Tue Jul 2 20:56:11 2019 +0800

    [GRIFFIN-247] Support pluggable sinks
    
    Author: iyuriysoft <42...@users.noreply.github.com>
    
    Closes #500 from iyuriysoft/custom-sink.
---
 griffin-doc/measure/measure-configuration-guide.md |  8 +++++-
 .../measure/configuration/enums/SinkType.scala     |  9 ++++++
 .../apache/griffin/measure/sink/SinkContext.scala  | 21 ++++++++++++++
 .../apache/griffin/measure/sink/SinkFactory.scala  | 32 ++++++++++++++++++++++
 4 files changed, 69 insertions(+), 1 deletion(-)

diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index 1013ae6..ac7b5c2 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -83,7 +83,7 @@ Above lists environment parameters.
 - **griffin.checkpoint**: This field configures list of griffin checkpoint parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration [here](#griffin-checkpoint).
 
 ### <a name="sinks"></a>Sinks
-- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo". 
+- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo", "custom". 
 - **config**: Configure parameters of each sink type.
 	+ console sink (aliases: "log")
 		* max.log.lines: the max lines of log.
@@ -98,6 +98,12 @@ Above lists environment parameters.
         * url: url of mongo db.
         * database: database name.
         * collection: collection name. 
+    + custom sink
+        * class: class name for user-provided data sink implementation
+        it should be implementing org.apache.griffin.measure.sink.Sink trait and have static method with signature
+		    ```def apply(ctx: SinkContext): Sink```. 
+        User-provided data sink should be present in Spark job's class path, by providing custom jar as -jar parameter
+		    to spark-submit or by adding to "jars" list in sparkProperties.json.
 
 ### <a name="griffin-checkpoint"></a>Griffin Checkpoint
 - **type**: Griffin checkpoint type, "zk" for zookeeper checkpoint.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index d9e5d2b..2a6d335 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -34,6 +34,7 @@ object SinkType {
     HdfsSinkType,
     ElasticsearchSinkType,
     MongoSinkType,
+    CustomSinkType,
     UnknownSinkType
   )
 
@@ -84,6 +85,14 @@ case object MongoSinkType extends SinkType {
   val desc = "distinct"
 }
 
+/**
+  * custom sink (needs using extra jar-file-extension)
+  */
+case object CustomSinkType extends SinkType {
+  val idPattern = "^(?i)custom$".r
+  val desc = "custom"
+}
+
 case object UnknownSinkType extends SinkType {
   val idPattern = "".r
   val desc = "unknown"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala
new file mode 100644
index 0000000..50d9f60
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala
@@ -0,0 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+case class SinkContext(config: Map[String, Any], metricName: String, timeStamp: Long, block: Boolean)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 623de18..7b8bd31 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -22,6 +22,7 @@ import scala.util.{Success, Try}
 
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.utils.ParamUtil._
 
 case class SinkFactory(sinkParamIter: Iterable[SinkParam],
                        metricName: String) extends Serializable {
@@ -45,6 +46,7 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
       case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp))
       case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName, timeStamp, block))
       case MongoSinkType => Try(MongoSink(config, metricName, timeStamp, block))
+      case CustomSinkType => Try(getCustomSink(config, metricName, timeStamp, block))
       case _ => throw new Exception(s"sink type ${sinkType} is not supported!")
     }
     sinkTry match {
@@ -53,4 +55,34 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
     }
   }
 
+  /**
+    * Using custom sink
+    *
+    * how it might look in env.json:
+    *
+    * "sinks": [
+    * {
+    * "type": "CUSTOM",
+    * "config": {
+    * "class": "com.yourcompany.griffin.sinks.MySuperSink",
+    * "path": "/Users/Shared"
+    * }
+    * },
+    *
+    */
+  private def getCustomSink(config: Map[String, Any],
+                            metricName: String,
+                            timeStamp: Long,
+                            block: Boolean): Sink = {
+    val className = config.getString("class", "")
+    val cls = Class.forName(className)
+    if (classOf[Sink].isAssignableFrom(cls)) {
+      val ctx = SinkContext(config, metricName, timeStamp, block)
+      val method = cls.getDeclaredMethod("apply", classOf[SinkContext])
+      method.invoke(null, ctx).asInstanceOf[Sink]
+    } else {
+      throw new ClassCastException(s"$className should extend Sink")
+    }
+  }
+
 }