You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2019/05/15 05:48:18 UTC

[griffin] branch master updated: Minor Refactor Codes

This is an automated email from the ASF dual-hosted git repository.

liujin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 81cc6c8  Minor Refactor Codes
81cc6c8 is described below

commit 81cc6c81d791772fb61b6da58c80266d937ffa8e
Author: Eugene <li...@apache.org>
AuthorDate: Wed May 15 13:47:34 2019 +0800

    Minor Refactor Codes
    
    1.update sink codes
    2.trim sink codes
    
    Author: Eugene <li...@apache.org>
    
    Closes #501 from toyboxman/refactor.
---
 .../apache/griffin/measure/sink/ConsoleSink.scala  |  5 +++-
 .../griffin/measure/sink/ElasticSearchSink.scala   |  8 ++++---
 .../org/apache/griffin/measure/sink/HdfsSink.scala | 13 +++++++----
 .../apache/griffin/measure/sink/MongoSink.scala    |  8 ++++---
 .../apache/griffin/measure/sink/MultiSinks.scala   | 27 ++++++++++++++--------
 .../apache/griffin/measure/sink/SinkFactory.scala  | 14 +++++------
 .../apache/griffin/measure/utils/HdfsUtil.scala    | 14 ++++++-----
 7 files changed, 55 insertions(+), 34 deletions(-)

diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index feebd91..20f4e13 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -26,7 +26,10 @@ import org.apache.griffin.measure.utils.ParamUtil._
 /**
   * sink metric and record to console, for debug
   */
-case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
+case class ConsoleSink(
+                        config: Map[String, Any],
+                        metricName: String,
+                        timeStamp: Long) extends Sink {
 
   val block: Boolean = true
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index c63fd09..e78a6a8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -25,12 +25,14 @@ import org.apache.spark.rdd.RDD
 import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
 import org.apache.griffin.measure.utils.ParamUtil._
 
-
 /**
   * sink metric and record through http request
   */
-case class ElasticSearchSink(config: Map[String, Any], metricName: String,
-                             timeStamp: Long, block: Boolean
+case class ElasticSearchSink(
+                              config: Map[String, Any],
+                              metricName: String,
+                              timeStamp: Long,
+                              block: Boolean
                             ) extends Sink {
 
   val Api = "api"
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index d103b32..23fb48e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -28,15 +28,18 @@ import org.apache.griffin.measure.utils.ParamUtil._
 /**
   * sink metric and record to hdfs
   */
-case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
+case class HdfsSink(
+                     config: Map[String, Any],
+                     metricName: String,
+                     timeStamp: Long) extends Sink {
 
   val block: Boolean = true
 
-  val Path = "path"
+  val PathKey = "path"
   val MaxPersistLines = "max.persist.lines"
   val MaxLinesPerFile = "max.lines.per.file"
 
-  val path = config.getOrElse(Path, "").toString
+  val parentPath = config.getOrElse(PathKey, "").toString
   val maxPersistLines = config.getInt(MaxPersistLines, -1)
   val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 1000000)
 
@@ -49,7 +52,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
   var _init = true
 
   def available(): Boolean = {
-    path.nonEmpty
+    parentPath.nonEmpty
   }
 
   private def logHead: String = {
@@ -70,7 +73,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
   }
 
   protected def filePath(file: String): String = {
-    HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}")
+    HdfsUtil.getHdfsFilePath(parentPath, s"${metricName}/${timeStamp}/${file}")
   }
 
   protected def withSuffix(path: String, suffix: String): String = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index c090201..0885762 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -32,9 +32,11 @@ import org.apache.griffin.measure.utils.TimeUtil
 /**
   * sink metric and record to mongo
   */
-case class MongoSink(config: Map[String, Any], metricName: String,
-                     timeStamp: Long, block: Boolean
-                       ) extends Sink {
+case class MongoSink(
+                      config: Map[String, Any],
+                      metricName: String,
+                      timeStamp: Long,
+                      block: Boolean) extends Sink {
 
   MongoConnection.init(config)
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
index b9f72da..3382546 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
@@ -23,11 +23,11 @@ import org.apache.spark.rdd.RDD
 /**
   * sink metric and record in multiple ways
   */
-case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
+case class MultiSinks(sinkIter: Iterable[Sink]) extends Sink {
 
   val block: Boolean = false
 
-  val headSinkOpt: Option[Sink] = sinks.headOption
+  val headSinkOpt: Option[Sink] = sinkIter.headOption
 
   val metricName: String = headSinkOpt.map(_.metricName).getOrElse("")
 
@@ -35,13 +35,20 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
 
   val config: Map[String, Any] = Map[String, Any]()
 
-  def available(): Boolean = { sinks.exists(_.available()) }
+  def available(): Boolean = {
+    sinkIter.exists(_.available())
+  }
+
+  def start(msg: String): Unit = {
+    sinkIter.foreach(_.start(msg))
+  }
 
-  def start(msg: String): Unit = { sinks.foreach(_.start(msg)) }
-  def finish(): Unit = { sinks.foreach(_.finish()) }
+  def finish(): Unit = {
+    sinkIter.foreach(_.finish())
+  }
 
   def log(rt: Long, msg: String): Unit = {
-    sinks.foreach { sink =>
+    sinkIter.foreach { sink =>
       try {
         sink.log(rt, msg)
       } catch {
@@ -51,7 +58,7 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
   }
 
   def sinkRecords(records: RDD[String], name: String): Unit = {
-    sinks.foreach { sink =>
+    sinkIter.foreach { sink =>
       try {
         sink.sinkRecords(records, name)
       } catch {
@@ -59,8 +66,9 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
       }
     }
   }
+
   def sinkRecords(records: Iterable[String], name: String): Unit = {
-    sinks.foreach { sink =>
+    sinkIter.foreach { sink =>
       try {
         sink.sinkRecords(records, name)
       } catch {
@@ -68,8 +76,9 @@ case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
       }
     }
   }
+
   def sinkMetrics(metrics: Map[String, Any]): Unit = {
-    sinks.foreach { sink =>
+    sinkIter.foreach { sink =>
       try {
         sink.sinkMetrics(metrics)
       } catch {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 49818f2..623de18 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -23,18 +23,18 @@ import scala.util.{Success, Try}
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums._
 
-
-
-case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable {
+case class SinkFactory(sinkParamIter: Iterable[SinkParam],
+                       metricName: String) extends Serializable {
 
   /**
     * create sink
-    * @param timeStamp    the timestamp of sink
-    * @param block        sink write metric in block or non-block way
-    * @return   sink
+    *
+    * @param timeStamp the timestamp of sink
+    * @param block     sink write metric in block or non-block way
+    * @return sink
     */
   def getSinks(timeStamp: Long, block: Boolean): MultiSinks = {
-    MultiSinks(sinkParams.flatMap(param => getSink(timeStamp, param, block)))
+    MultiSinks(sinkParamIter.flatMap(param => getSink(timeStamp, param, block)))
   }
 
   private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index d23dd46..ffb7e47 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -77,7 +77,6 @@ object HdfsUtil extends Loggable {
     out.close
   }
 
-
   def getHdfsFilePath(parentPath: String, fileName: String): String = {
     if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName
   }
@@ -91,9 +90,10 @@ object HdfsUtil extends Loggable {
     }
   }
 
-
-  def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false)
-    : Iterable[String] = {
+  def listSubPathsByType(
+                          dirPath: String,
+                          subType: String,
+                          fullPath: Boolean = false) : Iterable[String] = {
     if (existPath(dirPath)) {
       try {
         implicit val path = new Path(dirPath)
@@ -116,8 +116,10 @@ object HdfsUtil extends Loggable {
     } else Nil
   }
 
-  def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false)
-    : Iterable[String] = {
+  def listSubPathsByTypes(
+                           dirPath: String,
+                           subTypes: Iterable[String],
+                           fullPath: Boolean = false) : Iterable[String] = {
     subTypes.flatMap { subType =>
       listSubPathsByType(dirPath, subType, fullPath)
     }