You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2019/05/15 05:48:18 UTC
[griffin] branch master updated: Minor Refactor Codes
This is an automated email from the ASF dual-hosted git repository.
liujin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 81cc6c8 Minor Refactor Codes
81cc6c8 is described below
commit 81cc6c81d791772fb61b6da58c80266d937ffa8e
Author: Eugene <li...@apache.org>
AuthorDate: Wed May 15 13:47:34 2019 +0800
Minor Refactor Codes
1.update sink codes
2.trim sink codes
Author: Eugene <li...@apache.org>
Closes #501 from toyboxman/refactor.
---
.../apache/griffin/measure/sink/ConsoleSink.scala | 5 +++-
.../griffin/measure/sink/ElasticSearchSink.scala | 8 ++++---
.../org/apache/griffin/measure/sink/HdfsSink.scala | 13 +++++++----
.../apache/griffin/measure/sink/MongoSink.scala | 8 ++++---
.../apache/griffin/measure/sink/MultiSinks.scala | 27 ++++++++++++++--------
.../apache/griffin/measure/sink/SinkFactory.scala | 14 +++++------
.../apache/griffin/measure/utils/HdfsUtil.scala | 14 ++++++-----
7 files changed, 55 insertions(+), 34 deletions(-)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index feebd91..20f4e13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -26,7 +26,10 @@ import org.apache.griffin.measure.utils.ParamUtil._
/**
* sink metric and record to console, for debug
*/
-case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
+case class ConsoleSink(
+ config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long) extends Sink {
val block: Boolean = true
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index c63fd09..e78a6a8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -25,12 +25,14 @@ import org.apache.spark.rdd.RDD
import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
import org.apache.griffin.measure.utils.ParamUtil._
-
/**
* sink metric and record through http request
*/
-case class ElasticSearchSink(config: Map[String, Any], metricName: String,
- timeStamp: Long, block: Boolean
+case class ElasticSearchSink(
+ config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long,
+ block: Boolean
) extends Sink {
val Api = "api"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index d103b32..23fb48e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -28,15 +28,18 @@ import org.apache.griffin.measure.utils.ParamUtil._
/**
* sink metric and record to hdfs
*/
-case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
+case class HdfsSink(
+ config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long) extends Sink {
val block: Boolean = true
- val Path = "path"
+ val PathKey = "path"
val MaxPersistLines = "max.persist.lines"
val MaxLinesPerFile = "max.lines.per.file"
- val path = config.getOrElse(Path, "").toString
+ val parentPath = config.getOrElse(PathKey, "").toString
val maxPersistLines = config.getInt(MaxPersistLines, -1)
val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 1000000)
@@ -49,7 +52,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
var _init = true
def available(): Boolean = {
- path.nonEmpty
+ parentPath.nonEmpty
}
private def logHead: String = {
@@ -70,7 +73,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
}
protected def filePath(file: String): String = {
- HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}")
+ HdfsUtil.getHdfsFilePath(parentPath, s"${metricName}/${timeStamp}/${file}")
}
protected def withSuffix(path: String, suffix: String): String = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index c090201..0885762 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -32,9 +32,11 @@ import org.apache.griffin.measure.utils.TimeUtil
/**
* sink metric and record to mongo
*/
-case class MongoSink(config: Map[String, Any], metricName: String,
- timeStamp: Long, block: Boolean
- ) extends Sink {
+case class MongoSink(
+ config: Map[String, Any],
+ metricName: String,
+ timeStamp: Long,
+ block: Boolean) extends Sink {
MongoConnection.init(config)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
index b9f72da..3382546 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
@@ -23,11 +23,11 @@ import org.apache.spark.rdd.RDD
/**
* sink metric and record in multiple ways
*/
-case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
+case class MultiSinks(sinkIter: Iterable[Sink]) extends Sink {
val block: Boolean = false
- val headSinkOpt: Option[Sink] = sinks.headOption
+ val headSinkOpt: Option[Sink] = sinkIter.headOption
val metricName: String = headSinkOpt.map(_.metricName).getOrElse("")
@@ -35,13 +35,20 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
val config: Map[String, Any] = Map[String, Any]()
- def available(): Boolean = { sinks.exists(_.available()) }
+ def available(): Boolean = {
+ sinkIter.exists(_.available())
+ }
+
+ def start(msg: String): Unit = {
+ sinkIter.foreach(_.start(msg))
+ }
- def start(msg: String): Unit = { sinks.foreach(_.start(msg)) }
- def finish(): Unit = { sinks.foreach(_.finish()) }
+ def finish(): Unit = {
+ sinkIter.foreach(_.finish())
+ }
def log(rt: Long, msg: String): Unit = {
- sinks.foreach { sink =>
+ sinkIter.foreach { sink =>
try {
sink.log(rt, msg)
} catch {
@@ -51,7 +58,7 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
}
def sinkRecords(records: RDD[String], name: String): Unit = {
- sinks.foreach { sink =>
+ sinkIter.foreach { sink =>
try {
sink.sinkRecords(records, name)
} catch {
@@ -59,8 +66,9 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
}
}
}
+
def sinkRecords(records: Iterable[String], name: String): Unit = {
- sinks.foreach { sink =>
+ sinkIter.foreach { sink =>
try {
sink.sinkRecords(records, name)
} catch {
@@ -68,8 +76,9 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
}
}
}
+
def sinkMetrics(metrics: Map[String, Any]): Unit = {
- sinks.foreach { sink =>
+ sinkIter.foreach { sink =>
try {
sink.sinkMetrics(metrics)
} catch {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 49818f2..623de18 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -23,18 +23,18 @@ import scala.util.{Success, Try}
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums._
-
-
-case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable {
+case class SinkFactory(sinkParamIter: Iterable[SinkParam],
+ metricName: String) extends Serializable {
/**
* create sink
- * @param timeStamp the timestamp of sink
- * @param block sink write metric in block or non-block way
- * @return sink
+ *
+ * @param timeStamp the timestamp of sink
+ * @param block sink write metric in block or non-block way
+ * @return sink
*/
def getSinks(timeStamp: Long, block: Boolean): MultiSinks = {
- MultiSinks(sinkParams.flatMap(param => getSink(timeStamp, param, block)))
+ MultiSinks(sinkParamIter.flatMap(param => getSink(timeStamp, param, block)))
}
private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index d23dd46..ffb7e47 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -77,7 +77,6 @@ object HdfsUtil extends Loggable {
out.close
}
-
def getHdfsFilePath(parentPath: String, fileName: String): String = {
if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName
}
@@ -91,9 +90,10 @@ object HdfsUtil extends Loggable {
}
}
-
- def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false)
- : Iterable[String] = {
+ def listSubPathsByType(
+ dirPath: String,
+ subType: String,
+ fullPath: Boolean = false) : Iterable[String] = {
if (existPath(dirPath)) {
try {
implicit val path = new Path(dirPath)
@@ -116,8 +116,10 @@ object HdfsUtil extends Loggable {
} else Nil
}
- def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false)
- : Iterable[String] = {
+ def listSubPathsByTypes(
+ dirPath: String,
+ subTypes: Iterable[String],
+ fullPath: Boolean = false) : Iterable[String] = {
subTypes.flatMap { subType =>
listSubPathsByType(dirPath, subType, fullPath)
}