You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/07/02 12:56:26 UTC
[griffin] branch master updated: [GRIFFIN-247] Support pluggable
sinks
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 42bd3e1 [GRIFFIN-247] Support pluggable sinks
42bd3e1 is described below
commit 42bd3e111db4b41e5ac23a14021d22973801c7db
Author: iyuriysoft <42...@users.noreply.github.com>
AuthorDate: Tue Jul 2 20:56:11 2019 +0800
[GRIFFIN-247] Support pluggable sinks
Author: iyuriysoft <42...@users.noreply.github.com>
Closes #500 from iyuriysoft/custom-sink.
---
griffin-doc/measure/measure-configuration-guide.md | 8 +++++-
.../measure/configuration/enums/SinkType.scala | 9 ++++++
.../apache/griffin/measure/sink/SinkContext.scala | 21 ++++++++++++++
.../apache/griffin/measure/sink/SinkFactory.scala | 32 ++++++++++++++++++++++
4 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index 1013ae6..ac7b5c2 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -83,7 +83,7 @@ Above lists environment parameters.
- **griffin.checkpoint**: This field configures list of griffin checkpoint parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration [here](#griffin-checkpoint).
### <a name="sinks"></a>Sinks
-- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo".
+- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo", "custom".
- **config**: Configure parameters of each sink type.
+ console sink (aliases: "log")
* max.log.lines: the max lines of log.
@@ -98,6 +98,12 @@ Above lists environment parameters.
* url: url of mongo db.
* database: database name.
* collection: collection name.
+ + custom sink
+ * class: class name for user-provided data sink implementation
+ it should be implementing org.apache.griffin.measure.sink.Sink trait and have static method with signature
+ ```def apply(ctx: SinkContext): Sink```.
+ User-provided data sink should be present in Spark job's class path, by providing custom jar as -jar parameter
+ to spark-submit or by adding to "jars" list in sparkProperties.json.
### <a name="griffin-checkpoint"></a>Griffin Checkpoint
- **type**: Griffin checkpoint type, "zk" for zookeeper checkpoint.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index d9e5d2b..2a6d335 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -34,6 +34,7 @@ object SinkType {
HdfsSinkType,
ElasticsearchSinkType,
MongoSinkType,
+ CustomSinkType,
UnknownSinkType
)
@@ -84,6 +85,14 @@ case object MongoSinkType extends SinkType {
val desc = "distinct"
}
+/**
+ * custom sink (needs using extra jar-file-extension)
+ */
+case object CustomSinkType extends SinkType {
+ val idPattern = "^(?i)custom$".r
+ val desc = "custom"
+}
+
case object UnknownSinkType extends SinkType {
val idPattern = "".r
val desc = "unknown"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala
new file mode 100644
index 0000000..50d9f60
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala
@@ -0,0 +1,21 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+case class SinkContext(config: Map[String, Any], metricName: String, timeStamp: Long, block: Boolean)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 623de18..7b8bd31 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -22,6 +22,7 @@ import scala.util.{Success, Try}
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.utils.ParamUtil._
case class SinkFactory(sinkParamIter: Iterable[SinkParam],
metricName: String) extends Serializable {
@@ -45,6 +46,7 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp))
case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName, timeStamp, block))
case MongoSinkType => Try(MongoSink(config, metricName, timeStamp, block))
+ case CustomSinkType => Try(getCustomSink(config, metricName, timeStamp, block))
case _ => throw new Exception(s"sink type ${sinkType} is not supported!")
}
sinkTry match {
@@ -53,4 +55,34 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
}
}
+ /**
+ * Using custom sink
+ *
+ * how it might look in env.json:
+ *
+ * "sinks": [
+ * {
+ * "type": "CUSTOM",
+ * "config": {
+ * "class": "com.yourcompany.griffin.sinks.MySuperSink",
+ * "path": "/Users/Shared"
+ * }
+ * },
+ *
+ */
+ private def getCustomSink(config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long,
+ block: Boolean): Sink = {
+ val className = config.getString("class", "")
+ val cls = Class.forName(className)
+ if (classOf[Sink].isAssignableFrom(cls)) {
+ val ctx = SinkContext(config, metricName, timeStamp, block)
+ val method = cls.getDeclaredMethod("apply", classOf[SinkContext])
+ method.invoke(null, ctx).asInstanceOf[Sink]
+ } else {
+ throw new ClassCastException(s"$className should extend Sink")
+ }
+ }
+
}