You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2020/08/10 02:49:51 UTC

[griffin] branch master updated: [GRIFFIN-305] Standardize sink hierarchy

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aa8995  [GRIFFIN-305] Standardize sink hierarchy
1aa8995 is described below

commit 1aa8995af89b723b6fab6f0922afa1916a2f8084
Author: chitralverma <ch...@gmail.com>
AuthorDate: Mon Aug 10 10:49:42 2020 +0800

    [GRIFFIN-305] Standardize sink hierarchy
    
    **What changes were proposed in this pull request?**
    Currently, the implementation of `Sinks` in Griffin poses the below issues. This PR aims at fixing these issues.
    - `Sinks` are based on the recursive MultiSink class which is a sink itself but the underlying implementation is that of a `Seq` which causes ambiguity and isn't much useful. This has been removed.
    - Some unused code like `SinkContext` has been removed.
    - Data is converted from the performant DataFrame to RDD while persisting in both streaming and batch pipelines. A new method `sinkBatchRecords` has been added to allow operations directly on DataFrame for batch pipelines. Streaming will still use the old implementation which will be replaced with structured streaming.
    - Refactored the methods of `Sink` like changed `start`/ `finish` to `open`/ `close` and `jobName` was incorrectly passed as `metricName`.
    - Presently, only one instance of a sink with a given type can be defined in the env config. This will not allow the cases where you want to configure multiple sinks of same type like HDFS or JDBC. Added sink `name` to env config which is used to define the sink that should be used in the job config also.
    - Updated all sinks as per the changes above. With some additional changes to ConsoleSink
    
    **Does this PR introduce any user-facing change?**
    Yes. As mentioned above, the sink config has changed in env and job configs.
    
    **How was this patch tested?**
    Griffin test suite and additional unit test cases
    
    Author: chitralverma <ch...@gmail.com>
    
    Closes #575 from chitralverma/standardize-sink-hierarchy.
---
 griffin-doc/measure/measure-batch-sample.md        |   8 +-
 griffin-doc/measure/measure-configuration-guide.md | 209 ++++++++++++++++++---
 griffin-doc/measure/measure-streaming-sample.md    |   8 +-
 measure/pom.xml                                    |   9 +-
 .../src/main/resources/config-batch-advanced.json  |   4 +-
 measure/src/main/resources/config-batch-es.json    |   2 +-
 measure/src/main/resources/config-batch-path.json  |   4 +-
 measure/src/main/resources/config-batch.json       |   4 +-
 measure/src/main/resources/config-streaming.json   |   2 +-
 measure/src/main/resources/env-batch.json          |   5 +-
 measure/src/main/resources/env-streaming.json      |   5 +-
 .../configuration/dqdefinition/DQConfig.scala      |   6 +-
 .../configuration/dqdefinition/EnvConfig.scala     |  25 ++-
 .../measure/configuration/enums/SinkType.scala     |   5 +-
 .../apache/griffin/measure/context/DQContext.scala |  12 +-
 .../org/apache/griffin/measure/launch/DQApp.scala  |  24 ++-
 .../griffin/measure/launch/batch/BatchDQApp.scala  |  46 +++--
 .../measure/launch/streaming/StreamingDQApp.scala  |  36 ++--
 .../apache/griffin/measure/sink/ConsoleSink.scala  |  70 +++----
 .../griffin/measure/sink/ElasticSearchSink.scala   |  16 +-
 .../org/apache/griffin/measure/sink/HdfsSink.scala |  79 ++++----
 .../apache/griffin/measure/sink/MongoSink.scala    |  23 +--
 .../apache/griffin/measure/sink/MultiSinks.scala   |  89 ---------
 .../org/apache/griffin/measure/sink/Sink.scala     |  45 ++++-
 .../apache/griffin/measure/sink/SinkFactory.scala  |  68 ++++---
 .../measure/step/write/MetricFlushStep.scala       |   8 +-
 .../measure/step/write/RecordWriteStep.scala       |  38 ++--
 .../SinkContext.scala => utils/CommonUtils.scala}  |  32 +++-
 .../test/resources/_accuracy-batch-griffindsl.json |   3 +-
 .../resources/_accuracy-streaming-griffindsl.json  |   4 +-
 .../resources/_completeness-batch-griffindsl.json  |   3 +-
 .../_completeness-streaming-griffindsl.json        |   4 +-
 .../_completeness_errorconf-batch-griffindsl.json  |   2 +-
 .../resources/_distinctness-batch-griffindsl.json  |   3 +-
 .../_distinctness-streaming-griffindsl.json        |   4 +-
 .../_profiling-batch-griffindsl-hive.json          |   4 +-
 .../resources/_profiling-batch-griffindsl.json     |   2 +-
 .../_profiling-batch-griffindsl_malformed.json     |   2 +-
 .../test/resources/_profiling-batch-sparksql.json  |   4 +-
 .../resources/_profiling-streaming-griffindsl.json |   4 +-
 .../resources/_timeliness-batch-griffindsl.json    |   3 +-
 .../_timeliness-streaming-griffindsl.json          |   4 +-
 .../resources/_uniqueness-batch-griffindsl.json    |   3 +-
 .../_uniqueness-streaming-griffindsl.json          |   4 +-
 measure/src/test/resources/env-batch.json          |   3 +-
 .../src/test/resources/env-streaming-mongo.json    |   3 +-
 measure/src/test/resources/env-streaming.json      |   2 +-
 .../invalidtype_completeness_batch_griffindal.json |   2 +-
 .../dqdefinition/reader/ParamEnumReaderSpec.scala  |   5 +-
 .../apache/griffin/measure/sink/CustomSink.scala   |  25 ++-
 .../griffin/measure/sink/CustomSinkTest.scala      |  47 ++++-
 51 files changed, 592 insertions(+), 430 deletions(-)

diff --git a/griffin-doc/measure/measure-batch-sample.md b/griffin-doc/measure/measure-batch-sample.md
index 1aee173..107985c 100644
--- a/griffin-doc/measure/measure-batch-sample.md
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -76,8 +76,8 @@ Apache Griffin measures consist of batch measure and streaming measure, this doc
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
 ```
@@ -139,8 +139,8 @@ The miss records of source will be persisted as record.
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
 ```
diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index a821da3..300a699 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -43,11 +43,13 @@ Apache Griffin measure module needs two configuration files to define the parame
 
   "sinks": [
     {
+      "name": "ConsoleSink",
       "type": "console",
       "config": {
         "max.log.lines": 100
       }
     }, {
+      "name": "HdfsSink",
       "type": "hdfs",
       "config": {
         "path": "hdfs:///griffin/streaming/persist",
@@ -79,31 +81,180 @@ Above lists environment parameters.
 	+ batch.interval: Interval of dumping streaming data, for streaming mode.
 	+ process.interval: Interval of processing dumped streaming data, for streaming mode.
 	+ config: Configuration of spark parameters.
-- **sinks**: This field configures list of metrics sink parameters, multiple sink ways are supported. Details of sink configuration [here](#sinks).
+- **sinks**: This field configures list of sink definitions to persist both records and metrics. Details of sink configuration are available [here](#sinks).
 - **griffin.checkpoint**: This field configures list of griffin checkpoint parameters, multiple cache ways are supported. It is only for streaming dq case. Details of info cache configuration [here](#griffin-checkpoint).
 
 ### Sinks
-- **type**: Metrics and records sink type, "console", "hdfs", "http", "mongo", "custom". 
-- **config**: Configure parameters of each sink type.
-	+ console sink (aliases: "log")
-		* max.log.lines: the max lines of log.
-	+ hdfs sink
-		* path: hdfs path to sink metrics
-		* max.persist.lines: the max lines of total sink data.
-		* max.lines.per.file: the max lines of each sink file.
-	+ http sink (aliases: "es", "elasticsearch")
-		* api: api to submit sink metrics.
-		* method: http method, "post" default.
-    + mongo sink
-        * url: url of mongo db.
-        * database: database name.
-        * collection: collection name. 
-    + custom sink
-        * class: class name for user-provided data sink implementation
-        it should be implementing org.apache.griffin.measure.sink.Sink trait and have static method with signature
-		    ```def apply(ctx: SinkContext): Sink```. 
-        User-provided data sink should be present in Spark job's class path, by providing custom jar as -jar parameter
-		    to spark-submit or by adding to "jars" list in sparkProperties.json.
+Sinks allow persistence of job metrics and bad data (source records that violated the defined rules) to external 
+storage systems. 
+Sinks have to be defined in the Env Config, and their `name` are mentioned in the Job Config. 
+
+List of supported sinks:
+ - Console
+ - HDFS
+ - MongoDB
+ - ElasticSearch 
+ - Custom Implementations
+ 
+ #### Configuration
+  A sample sink configuration is as following,
+  
+  ```
+...
+
+ "sinks": [
+     {
+       "name": "ConsoleSink",
+       "type": "CONSOLE",
+       "config": {
+         "numRows": 10,
+         "truncate": false
+       }
+     }
+   ]
+
+...
+  ```
+ 
+  ##### Key Parameters:
+  | Name    | Type     | Description                             | Supported Values                                 |
+  |:--------|:---------|:----------------------------------------|:-------------------------------------------------|
+  | name    | `String` | User defined unique name for Sink       |                                                  |
+  | type    | `String` | Type of Sink (Value is case insensitive)| console, hdfs, elasticsearch, mongodb, custom    |
+  | config  | `Object` | Configuration params of the sink        | Depends on sink type (see below)                 |
+ 
+  ##### For Custom Sinks:
+  - **config** object must contain the key **class** whose value specifies class name for user-provided sink 
+  implementation. This class should implement  `org.apache.griffin.measure.sink.Sink` trait
+  - Example:
+       ```
+    ...
+    
+     "sinks": [
+         {
+           "name": "MyCustomSink",
+           "type": "CUSTOM",
+           "config": {
+             "class": "my.package.sink.MyCustomSinkImpl",
+             ...
+           }
+         }
+       ]
+    
+    ...
+       ```
+  
+  **Note:** User-provided sink should be present in Spark job's class path, by either providing custom jar with 
+ `--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.  
+
+##### For Console Sink:
+  - Console Sink, supports the following configurations. Other alias' like 'Log' as value for `type`.
+  
+     | Name           | Type     | Description                            | Default Values |
+     |:---------------|:---------|:---------------------------------------|:-------------- |
+     | numRows        | `Integer`| Number of records to log               | 20             |
+     | truncate       | `Boolean`| If true, strings more than 20 characters will be truncated and all cells will be aligned right| `true` |
+     
+ - Example:
+      ```
+     ...
+     
+      "sinks": [
+          {
+            "name": "ConsoleSink",
+            "type": "CONSOLE",
+            "config": {
+              "numRows": 10,
+              "truncate": false
+            }
+          }
+        ]
+     
+     ...
+      ```
+
+ ##### For HDFS Sink:
+   - HDFS Sink, supports the following configurations
+   
+      | Name               | Type     | Description                            | Default Values |
+      |:-------------------|:---------|:---------------------------------------|:-------------- |
+      | path               | `String` | HDFS base path to sink metrics         |                |
+      | max.persist.lines  | `Integer`| the max lines of total sink data       | -1             |
+      | max.lines.per.file | `Integer`| the max lines of each sink file        | 1000000        |
+      
+  - Example:
+       ```
+      ...
+      
+       "sinks": [
+           {
+             "name": "hdfsSink",
+             "type": "HDFS",
+             "config": {
+               "path": "hdfs://localhost/griffin/batch/persist",
+               "max.persist.lines": 10000,
+               "max.lines.per.file": 10000
+             }
+           }
+         ]
+      
+      ...
+       ```
+ 
+  ##### For MongoDB Sink:
+  - MongoDB Sink, supports the following configurations. Other alias' like 'Mongo' as value for `type`.
+  
+     | Name       | Type     | Description       | Default Values |
+     |:-----------|:---------|:------------------|:-------------- |
+     | url        | `String` | URL of MongoDB    |                |
+     | database   | `String` | Database name     |                |
+     | collection | `String` | Collection name   |                |
+     | over.time  | `Long`   | Wait Duration     | -1             |
+     | retry      | `Int`    | Number of retries | 10             |
+     
+ - Example:
+      ```
+     ...
+     
+      "sinks": [
+          {
+            "name": "MongoDBSink",
+            "type": "MongoDB",
+            "config": {
+              ...
+            }
+          }
+        ]
+     
+     ...
+      ```
+
+ ##### For Elasticsearch Sink:
+ - Elasticsearch Sink, supports the following configurations. Other alias' like 'ES' and 'HTTP' as value for `type`.
+   
+      | Name               | Type     | Description                   | Default Values |
+      |:-------------------|:---------|:------------------------------|:-------------- |
+      | api                | `String` | api to submit sink metrics    |                |
+      | method             | `String` | http method, "post" default   |                |
+      | connection.timeout | `Long`   | Wait Duration                 | -1             |
+      | retry              | `Integer`| Number of retries             | 10             |
+      
+  - Example:
+       ```
+      ...
+      
+       "sinks": [
+           {
+             "name": "ElasticsearchSink",
+             "type": "Elasticsearch",
+             "config": {
+               ...
+             }
+           }
+         ]
+      
+      ...
+       ```
 
 ### Griffin Checkpoint
 - **type**: Griffin checkpoint type, "zk" for zookeeper checkpoint.
@@ -126,7 +277,6 @@ Above lists environment parameters.
       "name": "src",
       "connector": {
         "type": "AVRO",
-        "version": "1.7",
         "config": {
           "file.path": "<path>/<to>",
           "file.name": "<source-file>.avro"
@@ -137,7 +287,6 @@ Above lists environment parameters.
       "name": "tgt",
       "connector": {
         "type": "AVRO",
-        "version": "1.7",
         "config": {
           "file.path": "<path>/<to>",
           "file.name": "<target-file>.avro"
@@ -172,9 +321,9 @@ Above lists environment parameters.
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "HTTP",
-    "HDFS"
+    "CONSOLESink",
+    "HTTPSink",
+    "HDFSSink"
   ]
 }
 ```
@@ -206,21 +355,23 @@ List of supported data connectors:
  A sample data connector configuration is as following,
  
  ```
+...
+
 "connector": {
     "type": "file",
-    "version": "1.7",
     "config": {
       "key1": "value1",
       "key2": "value2"
     }
   }
+
+...
  ```
 
  ##### Key Parameters:
  | Name    | Type     | Description                            | Supported Values                                 |
  |:--------|:---------|:---------------------------------------|:-------------------------------------------------|
  | type    | `String` | Type of the Connector                  | file, hive, kafka (streaming only), jdbc, custom |
- | version | `String` | Version String of connector (optional) | Depends on connector type                        |
  | config  | `Object` | Configuration params of the connector  | Depends on connector type (see below)            |
 
  ##### For Custom Data Connectors:
diff --git a/griffin-doc/measure/measure-streaming-sample.md b/griffin-doc/measure/measure-streaming-sample.md
index b942778..8268d5f 100644
--- a/griffin-doc/measure/measure-streaming-sample.md
+++ b/griffin-doc/measure/measure-streaming-sample.md
@@ -139,8 +139,8 @@ Apache Griffin measures consist of batch measure and streaming measure, this doc
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
 ```
@@ -251,8 +251,8 @@ The miss records of source will be persisted as record.
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
 ```
diff --git a/measure/pom.xml b/measure/pom.xml
index a5f5dc6..460723b 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -52,6 +52,7 @@ under the License.
         <scalafmt.parameters>--diff --test</scalafmt.parameters>
         <scalafmt.skip>false</scalafmt.skip>
         <elasticsearch.version>6.4.1</elasticsearch.version>
+        <spark.scope>provided</spark.scope>
     </properties>
 
     <dependencies>
@@ -67,25 +68,25 @@ under the License.
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
-            <scope>provided</scope>
+            <scope>${spark.scope}</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
-            <scope>provided</scope>
+            <scope>${spark.scope}</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
-            <scope>provided</scope>
+            <scope>${spark.scope}</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
-            <scope>provided</scope>
+            <scope>${spark.scope}</scope>
             <exclusions>
                 <exclusion>
                     <groupId>commons-httpclient</groupId>
diff --git a/measure/src/main/resources/config-batch-advanced.json b/measure/src/main/resources/config-batch-advanced.json
index c6740e9..3035940 100644
--- a/measure/src/main/resources/config-batch-advanced.json
+++ b/measure/src/main/resources/config-batch-advanced.json
@@ -52,7 +52,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "consoleSink",
+    "elasticSink"
   ]
 }
diff --git a/measure/src/main/resources/config-batch-es.json b/measure/src/main/resources/config-batch-es.json
index 438bef3..72c92fd 100644
--- a/measure/src/main/resources/config-batch-es.json
+++ b/measure/src/main/resources/config-batch-es.json
@@ -47,6 +47,6 @@
     ]
   },
   "sinks": [
-    "CONSOLE"
+    "consoleSink"
   ]
 }
diff --git a/measure/src/main/resources/config-batch-path.json b/measure/src/main/resources/config-batch-path.json
index 883a76b..6231a01 100644
--- a/measure/src/main/resources/config-batch-path.json
+++ b/measure/src/main/resources/config-batch-path.json
@@ -35,7 +35,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "consoleSink",
+    "elasticSink"
   ]
 }
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json
index d7bc337..dd11443 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -35,7 +35,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "consoleSink",
+    "elasticSink"
   ]
 }
diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json
index 9828984..8596b29 100644
--- a/measure/src/main/resources/config-streaming.json
+++ b/measure/src/main/resources/config-streaming.json
@@ -77,6 +77,6 @@
     ]
   },
   "sinks": [
-    "ELASTICSEARCH"
+    "elasticSink"
   ]
 }
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index f2a1639..bbec4e5 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -5,15 +5,16 @@
       "spark.master": "local[*]"
     }
   },
-
   "sinks": [
     {
+      "name": "consoleSink",
       "type": "CONSOLE",
       "config": {
         "max.log.lines": 10
       }
     },
     {
+      "name": "hdfsSink",
       "type": "HDFS",
       "config": {
         "path": "hdfs://localhost/griffin/batch/persist",
@@ -22,6 +23,7 @@
       }
     },
     {
+      "name": "elasticSink",
       "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
@@ -31,6 +33,5 @@
       }
     }
   ],
-
   "griffin.checkpoint": []
 }
diff --git a/measure/src/main/resources/env-streaming.json b/measure/src/main/resources/env-streaming.json
index f5e303c..0c024b2 100644
--- a/measure/src/main/resources/env-streaming.json
+++ b/measure/src/main/resources/env-streaming.json
@@ -17,15 +17,16 @@
       "spark.hadoop.fs.hdfs.impl.disable.cache": true
     }
   },
-
   "sinks": [
     {
+      "name": "consoleSink",
       "type": "CONSOLE",
       "config": {
         "max.log.lines": 100
       }
     },
     {
+      "name": "hdfsSink",
       "type": "HDFS",
       "config": {
         "path": "hdfs://localhost/griffin/streaming/persist",
@@ -34,6 +35,7 @@
       }
     },
     {
+      "name": "elasticSink",
       "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
@@ -41,7 +43,6 @@
       }
     }
   ],
-
   "griffin.checkpoint": [
     {
       "type": "zk",
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 9264185..f94a01a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -47,7 +47,7 @@ case class DQConfig(
     @JsonProperty("process.type") private val procType: String,
     @JsonProperty("data.sources") private val dataSources: List[DataSourceParam],
     @JsonProperty("evaluate.rule") private val evaluateRule: EvaluateRuleParam,
-    @JsonProperty("sinks") private val sinks: List[String])
+    @JsonProperty("sinks") private val sinks: List[String] = Nil)
     extends Param {
   def getName: String = name
   def getTimestampOpt: Option[Long] = if (timestamp != 0) Some(timestamp) else None
@@ -63,8 +63,8 @@ case class DQConfig(
       ._1
   }
   def getEvaluateRule: EvaluateRuleParam = evaluateRule
-  def getValidSinkTypes: Seq[SinkType] =
-    SinkType.validSinkTypes(if (sinks != null) sinks else Nil)
+  def getSinkNames: Seq[String] = sinks
+  def getValidSinkTypes: Seq[SinkType] = SinkType.validSinkTypes(sinks)
 
   def validate(): Unit = {
     assert(StringUtils.isNotBlank(name), "dq config name should not be blank")
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 50468aa..e19d351 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -25,10 +25,11 @@ import org.apache.griffin.measure.configuration.enums.SinkType
 import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
 
 /**
- * environment param
- * @param sparkParam         config of spark environment (must)
- * @param sinkParams         config of sink ways (optional)
- * @param checkpointParams   config of checkpoint locations (required in streaming mode)
+ * Model for Environment Config.
+ *
+ * @param sparkParam Job specific Spark Configs to override the Defaults set on the cluster
+ * @param sinkParams A [[Seq]] of sink definitions where records and metrics can be persisted
+ * @param checkpointParams Config of checkpoint locations (required in streaming mode)
  */
 @JsonInclude(Include.NON_NULL)
 case class EnvConfig(
@@ -45,6 +46,15 @@ case class EnvConfig(
     assert(sparkParam != null, "spark param should not be null")
     sparkParam.validate()
     getSinkParams.foreach(_.validate())
+    val repeatedSinks = sinkParams
+      .map(_.getName)
+      .groupBy(x => x)
+      .mapValues(_.size)
+      .filter(_._2 > 1)
+      .keys
+    assert(
+      repeatedSinks.isEmpty,
+      s"sink names must be unique. duplicate sink names ['${repeatedSinks.mkString("', '")}'] were found.")
     getCheckpointParams.foreach(_.validate())
   }
 }
@@ -88,13 +98,16 @@ case class SparkParam(
  */
 @JsonInclude(Include.NON_NULL)
 case class SinkParam(
+    @JsonProperty("name") private val name: String,
     @JsonProperty("type") private val sinkType: String,
-    @JsonProperty("config") private val config: Map[String, Any])
+    @JsonProperty("config") private val config: Map[String, Any] = Map.empty)
     extends Param {
+  def getName: String = name
   def getType: SinkType = SinkType.withNameWithDefault(sinkType)
-  def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
+  def getConfig: Map[String, Any] = config
 
   def validate(): Unit = {
+    assert(name != null, "sink name should must be defined")
     assert(StringUtils.isNotBlank(sinkType), "sink type should not be empty")
   }
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 75d91d8..7264a88 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -35,12 +35,11 @@ object SinkType extends GriffinEnum {
   val Console, Log, Hdfs, Es, Http, ElasticSearch, MongoDB, Mongo, Custom =
     Value
 
-  def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
-    val seq = strs
+  def validSinkTypes(sinkTypeSeq: Seq[String]): Seq[SinkType] = {
+    sinkTypeSeq
       .map(s => SinkType.withNameWithDefault(s))
       .filter(_ != SinkType.Unknown)
       .distinct
-    if (seq.nonEmpty) seq else Seq(SinkType.ElasticSearch)
   }
 
   override def withNameWithDefault(name: String): enums.SinkType.Value = {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 805a0c5..fa1468c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -77,16 +77,16 @@ case class DQContext(
   printTimeRanges()
 
   private val sinkFactory = SinkFactory(sinkParams, name)
-  private val defaultSink: Sink = createSink(contextId.timestamp)
+  private val defaultSinks: Seq[Sink] = createSinks(contextId.timestamp)
 
-  def getSink(timestamp: Long): Sink = {
-    if (timestamp == contextId.timestamp) getSink
-    else createSink(timestamp)
+  def getSinks(timestamp: Long): Seq[Sink] = {
+    if (timestamp == contextId.timestamp) getSinks
+    else createSinks(timestamp)
   }
 
-  def getSink: Sink = defaultSink
+  def getSinks: Seq[Sink] = defaultSinks
 
-  private def createSink(t: Long): Sink = {
+  private def createSinks(t: Long): Seq[Sink] = {
     procType match {
       case BatchProcessType => sinkFactory.getSinks(t, block = true)
       case StreamingProcessType => sinkFactory.getSinks(t, block = false)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index bc358fa..99e5683 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -19,6 +19,7 @@ package org.apache.griffin.measure.launch
 
 import scala.util.Try
 
+import org.apache.spark.metrics.sink.Sink
 import org.apache.spark.sql.SparkSession
 
 import org.apache.griffin.measure.Loggable
@@ -60,11 +61,26 @@ trait DQApp extends Loggable with Serializable {
     }
   }
 
+  /**
+   * Gets a valid [[Sink]] definition from the Env Config for each [[Sink]] defined in Job Config.
+   *
+   * @throws AssertionError if Env Config does not contain definition for a sink defined in Job Config
+   * @return [[Seq]] of [[Sink]] definitions
+   */
   protected def getSinkParams: Seq[SinkParam] = {
-    val validSinkTypes = dqParam.getValidSinkTypes
-    envParam.getSinkParams.flatMap { sinkParam =>
-      if (validSinkTypes.contains(sinkParam.getType)) Some(sinkParam) else None
-    }
+    val sinkParams = dqParam.getSinkNames
+      .map(_.toLowerCase())
+      .map { sinkName =>
+        (sinkName, envParam.getSinkParams.find(_.getName.toLowerCase().matches(sinkName)))
+      }
+
+    val missingSinks = sinkParams.filter(_._2.isEmpty).map(_._1)
+
+    assert(
+      missingSinks.isEmpty,
+      s"Sink(s) ['${missingSinks.mkString("', '")}'] not defined in env config.")
+
+    sinkParams.flatMap(_._2)
   }
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index dc1fb52..2407a77 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -17,7 +17,7 @@
 
 package org.apache.griffin.measure.launch.batch
 
-import java.util.Date
+import java.util.concurrent.TimeUnit
 
 import scala.util.Try
 
@@ -31,6 +31,7 @@ import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
+import org.apache.griffin.measure.utils.CommonUtils
 
 case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
 
@@ -60,45 +61,40 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
   }
 
   def run: Try[Boolean] = {
-    // start time
-    val startTime = new Date().getTime
+    val result = CommonUtils.timeThis({
+      val measureTime = getMeasureTime
+      val contextId = ContextId(measureTime)
 
-    val measureTime = getMeasureTime
-    val contextId = ContextId(measureTime)
+      // get data sources
+      val dataSources =
+        DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
+      dataSources.foreach(_.init())
 
-    // get data sources
-    val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources)
-    dataSources.foreach(_.init())
+      // create dq context
+      dqContext =
+        DQContext(contextId, metricName, dataSources, sinkParams, BatchProcessType)(sparkSession)
 
-    // create dq context
-    dqContext =
-      DQContext(contextId, metricName, dataSources, sinkParams, BatchProcessType)(sparkSession)
+      // start id
+      val applicationId = sparkSession.sparkContext.applicationId
+      dqContext.getSinks.foreach(_.open(applicationId))
 
-    // start id
-    val applicationId = sparkSession.sparkContext.applicationId
-    dqContext.getSink.start(applicationId)
+      // build job
+      val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
 
-    // build job
-    val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.getEvaluateRule)
-
-    // dq job execute
-    val result = dqJob.execute(dqContext)
-
-    // end time
-    val endTime = new Date().getTime
-    dqContext.getSink.log(endTime, s"process using time: ${endTime - startTime} ms")
+      // dq job execute
+      dqJob.execute(dqContext)
+    }, TimeUnit.MILLISECONDS)
 
     // clean context
     dqContext.clean()
 
     // finish
-    dqContext.getSink.finish()
+    dqContext.getSinks.foreach(_.close())
 
     result
   }
 
   def close: Try[_] = Try {
-    sparkSession.close()
     sparkSession.stop()
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index f91a003..09554f3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -101,7 +101,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
 
     // start id
     val applicationId = sparkSession.sparkContext.applicationId
-    globalContext.getSink.start(applicationId)
+    globalContext.getSinks.foreach(_.open(applicationId))
 
     // process thread
     val dqCalculator = StreamingDQCalculator(globalContext, dqParam.getEvaluateRule)
@@ -121,7 +121,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
     globalContext.clean()
 
     // finish
-    globalContext.getSink.finish()
+    globalContext.getSinks.foreach(_.close())
 
     true
   }
@@ -160,7 +160,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
       with Loggable {
 
     val lock: CheckpointLock = OffsetCheckpointClient.genLock("process")
-    val appSink: Sink = globalContext.getSink
+    val appSink: Iterable[Sink] = globalContext.getSinks
 
     var dqContext: DQContext = _
     var dqJob: DQJob = _
@@ -172,28 +172,30 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
       val locked = lock.lock(5, TimeUnit.SECONDS)
       if (locked) {
         try {
+          import org.apache.griffin.measure.utils.CommonUtils
 
           OffsetCheckpointClient.startOffsetCheckpoint()
 
-          val startTime = new Date().getTime
-          appSink.log(startTime, "starting process ...")
-          val contextId = ContextId(startTime)
+          CommonUtils.timeThis({
+            // start time
+            val startTime = new Date().getTime
 
-          // create dq context
-          dqContext = globalContext.cloneDQContext(contextId)
+            val contextId = ContextId(startTime)
 
-          // build job
-          dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
+            // create dq context
+            dqContext = globalContext.cloneDQContext(contextId)
 
-          // dq job execute
-          dqJob.execute(dqContext)
+            // build job
+            dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
 
-          // finish calculation
-          finishCalculation(dqContext)
+            // dq job execute
+            dqJob.execute(dqContext)
 
-          // end time
-          val endTime = new Date().getTime
-          appSink.log(endTime, s"process using time: ${endTime - startTime} ms")
+            // finish calculation
+            finishCalculation(dqContext)
+
+            // end time
+          }, TimeUnit.MILLISECONDS)
 
           OffsetCheckpointClient.endOffsetCheckpoint()
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 5bfa3e6..8459e88 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -18,67 +18,53 @@
 package org.apache.griffin.measure.sink
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
- * sink metric and record to console, for debug
+ * Console Sink for Records and Metrics.
+ * Records are shown in a tabular structure and Metrics are logged as JSON string.
+ *
+ * Supported Configurations:
+ *  - truncate : [[Boolean]] Whether truncate long strings. If true, strings more than 20 characters
+ *  will be truncated and all cells will be aligned right. Default is true.
+ *  - numRows : [[Int]] Number of rows to show. Default is 20.
  */
-case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: Long)
-    extends Sink {
+case class ConsoleSink(config: Map[String, Any], jobName: String, timeStamp: Long) extends Sink {
 
   val block: Boolean = true
 
-  val MaxLogLines = "max.log.lines"
+  val Truncate: String = "truncate"
+  val truncateRecords: Boolean = config.getBoolean(Truncate, defValue = true)
 
-  val maxLogLines: Int = config.getInt(MaxLogLines, 100)
+  val NumberOfRows: String = "numRows"
+  val numRows: Int = config.getInt(NumberOfRows, 20)
 
-  def available(): Boolean = true
+  def validate(): Boolean = true
 
-  def start(msg: String): Unit = {
-    println(s"[$timeStamp] $metricName start: $msg")
-  }
-  def finish(): Unit = {
-    println(s"[$timeStamp] $metricName finish")
+  override def open(applicationId: String): Unit = {
+    griffinLogger.info(
+      s"Opened ConsoleSink for job with name '$jobName', " +
+        s"timestamp '$timeStamp' and applicationId '$applicationId'")
   }
 
-  def log(rt: Long, msg: String): Unit = {
-    println(s"[$timeStamp] $rt: $msg")
+  override def close(): Unit = {
+    griffinLogger.info(
+      s"Closed ConsoleSink for job with name '$jobName' and timestamp '$timeStamp'")
   }
 
-  def sinkRecords(records: RDD[String], name: String): Unit = {
-//    println(s"${metricName} [${timeStamp}] records: ")
-//    try {
-//      val recordCount = records.count
-//      val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount)
-//      val maxCount = count.toInt
-//      if (maxCount > 0) {
-//        val recordsArray = records.take(maxCount)
-//        recordsArray.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-  }
+  override def sinkRecords(records: RDD[String], name: String): Unit = {}
+
+  override def sinkRecords(records: Iterable[String], name: String): Unit = {}
 
-  def sinkRecords(records: Iterable[String], name: String): Unit = {
-//    println(s"${metricName} [${timeStamp}] records: ")
-//    try {
-//      val recordCount = records.size
-//      val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount)
-//      if (count > 0) {
-//        records.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
+  override def sinkMetrics(metrics: Map[String, Any]): Unit = {
+    griffinLogger.info(s"$jobName [$timeStamp] metrics:\n${JsonUtil.toJson(metrics)}")
   }
 
-  def sinkMetrics(metrics: Map[String, Any]): Unit = {
-    println(s"$metricName [$timeStamp] metrics: ")
-    val json = JsonUtil.toJson(metrics)
-    println(json)
+  override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
+    dataset.show(numRows, truncateRecords)
   }
 
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index aac0969..d8cce41 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -20,6 +20,7 @@ package org.apache.griffin.measure.sink
 import scala.concurrent.Future
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
 import org.apache.griffin.measure.utils.ParamUtil._
@@ -29,7 +30,7 @@ import org.apache.griffin.measure.utils.ParamUtil._
  */
 case class ElasticSearchSink(
     config: Map[String, Any],
-    metricName: String,
+    jobName: String,
     timeStamp: Long,
     block: Boolean)
     extends Sink {
@@ -49,13 +50,10 @@ case class ElasticSearchSink(
 
   val _Value = "value"
 
-  def available(): Boolean = {
+  def validate(): Boolean = {
     api.nonEmpty
   }
 
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
   private def httpResult(dataMap: Map[String, Any]): Unit = {
     try {
       val data = JsonUtil.toJson(dataMap)
@@ -75,13 +73,9 @@ case class ElasticSearchSink(
 
   }
 
-  def log(rt: Long, msg: String): Unit = {}
-
-  def sinkRecords(records: RDD[String], name: String): Unit = {}
-  def sinkRecords(records: Iterable[String], name: String): Unit = {}
-
-  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+  override def sinkMetrics(metrics: Map[String, Any]): Unit = {
     httpResult(metrics)
   }
 
+  override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {}
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index 590e2d4..72a8324 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -17,18 +17,16 @@
 
 package org.apache.griffin.measure.sink
 
-import java.util.Date
-
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
  * sink metric and record to hdfs
  */
-case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
+case class HdfsSink(config: Map[String, Any], jobName: String, timeStamp: Long) extends Sink {
 
   val block: Boolean = true
 
@@ -48,44 +46,44 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
 
   var _init = true
 
-  def available(): Boolean = {
+  def validate(): Boolean = {
     parentPath.nonEmpty
   }
 
-  private def logHead: String = {
-    if (_init) {
-      _init = false
-      val dt = new Date(timeStamp)
-      s"================ log of $dt ================\n"
-    } else ""
-  }
-
-  private def timeHead(rt: Long): String = {
-    val dt = new Date(rt)
-    s"--- $dt ---\n"
-  }
-
-  private def logWrap(rt: Long, msg: String): String = {
-    logHead + timeHead(rt) + s"$msg\n\n"
-  }
+//  private def logHead: String = {
+//    if (_init) {
+//      _init = false
+//      val dt = new Date(timeStamp)
+//      s"================ log of $dt ================\n"
+//    } else ""
+//  }
+//
+//  private def timeHead(rt: Long): String = {
+//    val dt = new Date(rt)
+//    s"--- $dt ---\n"
+//  }
+//
+//  private def logWrap(rt: Long, msg: String): String = {
+//    logHead + timeHead(rt) + s"$msg\n\n"
+//  }
 
   protected def filePath(file: String): String = {
-    HdfsUtil.getHdfsFilePath(parentPath, s"$metricName/$timeStamp/$file")
+    HdfsUtil.getHdfsFilePath(parentPath, s"$jobName/$timeStamp/$file")
   }
 
   protected def withSuffix(path: String, suffix: String): String = {
     s"$path.$suffix"
   }
 
-  def start(msg: String): Unit = {
+  override def open(applicationId: String): Unit = {
     try {
-      HdfsUtil.writeContent(StartFile, msg)
+      HdfsUtil.writeContent(StartFile, applicationId)
     } catch {
       case e: Throwable => error(e.getMessage, e)
     }
   }
 
-  def finish(): Unit = {
+  override def close(): Unit = {
     try {
       HdfsUtil.createEmptyFile(FinishFile)
     } catch {
@@ -93,16 +91,16 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     }
   }
 
-  def log(rt: Long, msg: String): Unit = {
-    try {
-      val logStr = logWrap(rt, msg)
-      HdfsUtil.withHdfsFile(LogFile) { out =>
-        out.write(logStr.getBytes("utf-8"))
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage, e)
-    }
-  }
+//  def log(rt: Long, msg: String): Unit = {
+//    try {
+//      val logStr = logWrap(rt, msg)
+//      HdfsUtil.withHdfsFile(LogFile) { out =>
+//        out.write(logStr.getBytes("utf-8"))
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage, e)
+//    }
+//  }
 
   private def getHdfsPath(path: String, groupId: Int): String = {
     HdfsUtil.getHdfsFilePath(path, s"$groupId")
@@ -112,7 +110,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     HdfsUtil.deleteHdfsPath(path)
   }
 
-  def sinkRecords(records: RDD[String], name: String): Unit = {
+  override def sinkRecords(records: RDD[String], name: String): Unit = {
     val path = filePath(name)
     clearOldRecords(path)
     try {
@@ -146,7 +144,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     }
   }
 
-  def sinkRecords(records: Iterable[String], name: String): Unit = {
+  override def sinkRecords(records: Iterable[String], name: String): Unit = {
     val path = filePath(name)
     clearOldRecords(path)
     try {
@@ -174,7 +172,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     }
   }
 
-  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+  override def sinkMetrics(metrics: Map[String, Any]): Unit = {
     try {
       val json = JsonUtil.toJson(metrics)
       sinkRecords2Hdfs(MetricsFile, json :: Nil)
@@ -195,4 +193,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
     }
   }
 
+  override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
+    sinkRecords(dataset.toJSON.rdd, key.getOrElse(""))
+  }
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index 59be39c..502cae8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -20,6 +20,7 @@ package org.apache.griffin.measure.sink
 import scala.concurrent.Future
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 import org.mongodb.scala._
 import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
 import org.mongodb.scala.result.UpdateResult
@@ -30,11 +31,7 @@ import org.apache.griffin.measure.utils.TimeUtil
 /**
  * sink metric and record to mongo
  */
-case class MongoSink(
-    config: Map[String, Any],
-    metricName: String,
-    timeStamp: Long,
-    block: Boolean)
+case class MongoSink(config: Map[String, Any], jobName: String, timeStamp: Long, block: Boolean)
     extends Sink {
 
   MongoConnection.init(config)
@@ -49,22 +46,17 @@ case class MongoSink(
   val _Timestamp = "timestamp"
   val _Value = "value"
 
-  def available(): Boolean = MongoConnection.dataConf.available
+  def validate(): Boolean = MongoConnection.dataConf.available
 
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
+  override def sinkRecords(records: RDD[String], name: String): Unit = {}
+  override def sinkRecords(records: Iterable[String], name: String): Unit = {}
 
-  def log(rt: Long, msg: String): Unit = {}
-
-  def sinkRecords(records: RDD[String], name: String): Unit = {}
-  def sinkRecords(records: Iterable[String], name: String): Unit = {}
-
-  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+  override def sinkMetrics(metrics: Map[String, Any]): Unit = {
     mongoInsert(metrics)
   }
 
   private val filter =
-    Filters.and(Filters.eq(_MetricName, metricName), Filters.eq(_Timestamp, timeStamp))
+    Filters.and(Filters.eq(_MetricName, jobName), Filters.eq(_Timestamp, timeStamp))
 
   private def mongoInsert(dataMap: Map[String, Any]): Unit = {
     try {
@@ -83,6 +75,7 @@ case class MongoSink(
     }
   }
 
+  override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {}
 }
 
 object MongoConnection {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
deleted file mode 100644
index dd45c9c..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.griffin.measure.sink
-
-import org.apache.spark.rdd.RDD
-
-/**
- * sink metric and record in multiple ways
- */
-case class MultiSinks(sinkIter: Iterable[Sink]) extends Sink {
-
-  val block: Boolean = false
-
-  val headSinkOpt: Option[Sink] = sinkIter.headOption
-
-  val metricName: String = headSinkOpt.map(_.metricName).getOrElse("")
-
-  val timeStamp: Long = headSinkOpt.map(_.timeStamp).getOrElse(0)
-
-  val config: Map[String, Any] = Map[String, Any]()
-
-  def available(): Boolean = {
-    sinkIter.exists(_.available())
-  }
-
-  def start(msg: String): Unit = {
-    sinkIter.foreach(_.start(msg))
-  }
-
-  def finish(): Unit = {
-    sinkIter.foreach(_.finish())
-  }
-
-  def log(rt: Long, msg: String): Unit = {
-    sinkIter.foreach { sink =>
-      try {
-        sink.log(rt, msg)
-      } catch {
-        case e: Throwable => error(s"log error: ${e.getMessage}", e)
-      }
-    }
-  }
-
-  def sinkRecords(records: RDD[String], name: String): Unit = {
-    sinkIter.foreach { sink =>
-      try {
-        sink.sinkRecords(records, name)
-      } catch {
-        case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
-      }
-    }
-  }
-
-  def sinkRecords(records: Iterable[String], name: String): Unit = {
-    sinkIter.foreach { sink =>
-      try {
-        sink.sinkRecords(records, name)
-      } catch {
-        case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
-      }
-    }
-  }
-
-  def sinkMetrics(metrics: Map[String, Any]): Unit = {
-    sinkIter.foreach { sink =>
-      try {
-        sink.sinkMetrics(metrics)
-      } catch {
-        case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
-      }
-    }
-  }
-
-}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
index 6cb6f26..2834e5b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
@@ -18,30 +18,57 @@
 package org.apache.griffin.measure.sink
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import org.apache.griffin.measure.Loggable
 
 /**
- * sink metric and record
+ * Base trait for batch and Streaming Sinks.
+ * To implement custom sinks, extend your classes with this trait.
  */
 trait Sink extends Loggable with Serializable {
-  val metricName: String
+
+  val jobName: String
   val timeStamp: Long
 
   val config: Map[String, Any]
 
   val block: Boolean
 
-  def available(): Boolean
+  /**
+   * Ensures that the pre-requisites (if any) of the Sink are met before opening it.
+   */
+  def validate(): Boolean
+
+  /**
+   * Allows initialization of the connection to the sink (if required).
+   *
+   * @param applicationId Spark Application ID
+   */
+  def open(applicationId: String): Unit = {}
 
-  def start(msg: String): Unit
-  def finish(): Unit
+  /**
+   * Allows clean up for the sink (if required).
+   */
+  def close(): Unit = {}
 
-  def log(rt: Long, msg: String): Unit
+  /**
+   * Implementation of persisting records for streaming pipelines.
+   */
+  def sinkRecords(records: RDD[String], name: String): Unit = {}
 
-  def sinkRecords(records: RDD[String], name: String): Unit
-  def sinkRecords(records: Iterable[String], name: String): Unit
+  /**
+   * Implementation of persisting records for streaming pipelines.
+   */
+  def sinkRecords(records: Iterable[String], name: String): Unit = {}
 
-  def sinkMetrics(metrics: Map[String, Any]): Unit
+  /**
+   * Implementation of persisting metrics.
+   */
+  def sinkMetrics(metrics: Map[String, Any]): Unit = {}
 
+  /**
+   * Implementation of persisting records for batch pipelines.
+   */
+  def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {}
 }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 3deff4d..2dde859 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -24,34 +24,50 @@ import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums.SinkType._
 import org.apache.griffin.measure.utils.ParamUtil._
 
-case class SinkFactory(sinkParamIter: Iterable[SinkParam], metricName: String)
+/**
+ * SinkFactory, responsible for creation of Batch and Streaming Sinks based on the definition
+ * provided in Env Config.
+ *
+ * @param sinkParamIter [[Seq]] of sink definitions as [[SinkParam]]
+ * @param jobName name of the current Griffin Job
+ */
+case class SinkFactory(sinkParamIter: Seq[SinkParam], jobName: String)
     extends Loggable
     with Serializable {
 
   /**
-   * create sink
+   * Creates all the sinks defined in the Env Config.
    *
-   * @param timeStamp the timestamp of sink
-   * @param block     sink write metric in block or non-block way
-   * @return sink
+   * @param timeStamp epoch timestamp
+   * @param block persist in blocking or non-blocking way
+   * @return a [[Seq]] of [[Sink]] that were created successfully
    */
-  def getSinks(timeStamp: Long, block: Boolean): MultiSinks = {
-    MultiSinks(sinkParamIter.flatMap(param => getSink(timeStamp, param, block)))
+  def getSinks(timeStamp: Long, block: Boolean): Seq[Sink] = {
+    sinkParamIter.flatMap(param => getSink(timeStamp, param, block))
   }
 
+  /**
+   * Creates a [[Sink]] from the definition provided in the Env Config.
+   * Supported [[Sink]] are defined in [[SinkType]].
+   *
+   * @param timeStamp epoch timestamp
+   * @param sinkParam sink definition
+   * @param block persist in blocking or non-blocking way
+   * @return [[Some]](sink) if successfully created sink else [[None]]
+   */
   private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
     val config = sinkParam.getConfig
     val sinkType = sinkParam.getType
     val sinkTry = sinkType match {
-      case Console => Try(ConsoleSink(config, metricName, timeStamp))
-      case Hdfs => Try(HdfsSink(config, metricName, timeStamp))
-      case ElasticSearch => Try(ElasticSearchSink(config, metricName, timeStamp, block))
-      case MongoDB => Try(MongoSink(config, metricName, timeStamp, block))
-      case Custom => Try(getCustomSink(config, metricName, timeStamp, block))
+      case Console => Try(ConsoleSink(config, jobName, timeStamp))
+      case Hdfs => Try(HdfsSink(config, jobName, timeStamp))
+      case ElasticSearch => Try(ElasticSearchSink(config, jobName, timeStamp, block))
+      case MongoDB => Try(MongoSink(config, jobName, timeStamp, block))
+      case Custom => Try(getCustomSink(config, timeStamp, block))
       case _ => throw new Exception(s"sink type $sinkType is not supported!")
     }
     sinkTry match {
-      case Success(sink) if sink.available() => Some(sink)
+      case Success(sink) if sink.validate() => Some(sink)
       case Failure(ex) =>
         error("Failed to get sink", ex)
         None
@@ -59,25 +75,17 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam], metricName: String)
   }
 
   /**
-   * Using custom sink
-   *
-   * how it might look in env.json:
+   * Creates a custom [[Sink]] using reflection for a provided class name.
+   * Refer to measure configuration guide for more information regarding Custom sinks.
    *
-   * "sinks": [
-   * {
-   * "type": "CUSTOM",
-   * "config": {
-   * "class": "com.yourcompany.griffin.sinks.MySuperSink",
-   * "path": "/Users/Shared"
-   * }
-   * },
+   * @throws ClassCastException when the provided class name does not extend [[Sink]]
+   * @param config values defined in Env Config for the custom sink
+   * @param timeStamp epoch timestamp
+   * @param block persist in blocking or non-blocking way
+   * @return [[Sink]] if created successfully
    *
    */
-  private def getCustomSink(
-      config: Map[String, Any],
-      metricName: String,
-      timeStamp: Long,
-      block: Boolean): Sink = {
+  private def getCustomSink(config: Map[String, Any], timeStamp: Long, block: Boolean): Sink = {
     val className = config.getString("class", "")
     val cls = Class.forName(className)
     if (classOf[Sink].isAssignableFrom(cls)) {
@@ -91,7 +99,7 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam], metricName: String)
         .invoke(
           null,
           config,
-          metricName.asInstanceOf[Object],
+          jobName.asInstanceOf[Object],
           timeStamp.asInstanceOf[Object],
           block.asInstanceOf[Object])
         .asInstanceOf[Sink]
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index ed4bc54..4f4bf99 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -34,7 +34,13 @@ case class MetricFlushStep() extends WriteStep {
     context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
       val pr = try {
-        context.getSink(t).sinkMetrics(metric)
+        context.getSinks(t).foreach { sink =>
+          try {
+            sink.sinkMetrics(metric)
+          } catch {
+            case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+          }
+        }
         true
       } catch {
         case e: Throwable =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
index 975bdc5..d327b6d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.step.write
 
+import scala.util.Try
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import scala.util.Try
 
 import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
@@ -47,7 +48,13 @@ case class RecordWriteStep(
         // write records
         recordsOpt match {
           case Some(records) =>
-            context.getSink(timestamp).sinkRecords(records, name)
+            context.getSinks(timestamp).foreach { sink =>
+              try {
+                sink.sinkBatchRecords(records, Option(name))
+              } catch {
+                case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
+              }
+            }
           case _ =>
         }
       case TimestampMode =>
@@ -56,12 +63,24 @@ case class RecordWriteStep(
         // write records
         recordsOpt.foreach { records =>
           records.foreach { pair =>
-            val (t, strs) = pair
-            context.getSink(t).sinkRecords(strs, name)
+            val (t, strRecords) = pair
+            context.getSinks(t).foreach { sink =>
+              try {
+                sink.sinkRecords(strRecords, name)
+              } catch {
+                case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
+              }
+            }
           }
         }
         emptyTimestamps.foreach { t =>
-          context.getSink(t).sinkRecords(Nil, name)
+          context.getSinks(t).foreach { sink =>
+            try {
+              sink.sinkRecords(Nil, name)
+            } catch {
+              case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
+            }
+          }
         }
     }
     true
@@ -86,14 +105,11 @@ case class RecordWriteStep(
     }
   }
 
-  private def getRecordDataFrame(context: DQContext): Option[DataFrame] =
-    getDataFrame(context, inputName)
-
   private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] =
     filterTableNameOpt.flatMap(getDataFrame(context, _))
 
-  private def getBatchRecords(context: DQContext): Option[RDD[String]] = {
-    getRecordDataFrame(context).map(_.toJSON.rdd)
+  private def getBatchRecords(context: DQContext): Option[DataFrame] = {
+    getDataFrame(context, inputName)
   }
 
   private def getStreamingRecords(
@@ -101,7 +117,7 @@ case class RecordWriteStep(
     implicit val encoder: Encoder[(Long, String)] =
       Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
     val defTimestamp = context.contextId.timestamp
-    getRecordDataFrame(context) match {
+    getDataFrame(context, inputName) match {
       case Some(df) =>
         val (filterFuncOpt, emptyTimestamps) = getFilterTableDataFrame(context) match {
           case Some(filterDf) =>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
similarity index 50%
rename from measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala
rename to measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
index 2120aaf..268b3a0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/CommonUtils.scala
@@ -15,10 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.griffin.measure.sink
+package org.apache.griffin.measure.utils
 
-case class SinkContext(
-    config: Map[String, Any],
-    metricName: String,
-    timeStamp: Long,
-    block: Boolean)
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+
+object CommonUtils extends Loggable {
+
+  /**
+   * Executes a given code block and logs the time taken for its execution.
+   *
+   * @param f Arbitrary code block
+   * @param timeUnit required for time conversion to desired unit. Default: [[TimeUnit.SECONDS]]
+   * @tparam T resultant type parameter
+   * @return result of type T
+   */
+  def timeThis[T](f: => T, timeUnit: TimeUnit = TimeUnit.SECONDS): T = {
+    val startNanos = System.nanoTime()
+    val result = f
+    val endNanos = System.nanoTime()
+
+    griffinLogger.info(s"Time taken: ${timeUnit
+      .convert(endNanos - startNanos, TimeUnit.NANOSECONDS)} ${timeUnit.name().toLowerCase}")
+
+    result
+  }
+}
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json
index 19a49c3..7453b9e 100644
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -46,7 +46,6 @@
     ]
   },
   "sinks": [
-    "LOG",
-    "ELASTICSEARCH"
+    "consoleSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index c7c1095..348b690 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -119,7 +119,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESINK",
+    "ELASTICSEARCHSINK"
   ]
 }
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json
index c757624..a8fdcf7 100644
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -31,7 +31,6 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESINK"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index 114c12d..0fd2669 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -62,7 +62,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
index cfe5326..52f3cca 100644
--- a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
+++ b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
@@ -14,7 +14,7 @@
     }
   ],
   "sinks": [
-    "CONSOLE"
+    "CONSOLESink"
   ],
   "name": "test_griffin_complete_lizhao.bd",
   "evaluate.rule": {
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json
index bef2b50..39db17c 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -51,7 +51,6 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
index 88540e4..d3a3ceb 100644
--- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -86,7 +86,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
index d7df301..7847a59 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
@@ -46,7 +46,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json
index e1df3da..1503761 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -52,6 +52,6 @@
     ]
   },
   "sinks": [
-    "CONSOLE"
+    "CONSOLESink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
index e8c72f9..10c6f35 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl_malformed.json
@@ -39,5 +39,5 @@
     ]
   },
 
-  "sinks": ["CONSOLE"]
+  "sinks": ["CONSOLESink"]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json b/measure/src/test/resources/_profiling-batch-sparksql.json
index eaf0f89..e2a7ac4 100644
--- a/measure/src/test/resources/_profiling-batch-sparksql.json
+++ b/measure/src/test/resources/_profiling-batch-sparksql.json
@@ -42,7 +42,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index efe0929..30a0cba 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -75,7 +75,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index f9be0fa..aa172b5 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -48,7 +48,6 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index 360ea88..f4c5060 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -82,7 +82,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
index fe5eef5..e18cd32 100644
--- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
@@ -53,7 +53,6 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
index 7c57748..ea81b17 100644
--- a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
@@ -118,7 +118,7 @@
     ]
   },
   "sinks": [
-    "CONSOLE",
-    "ELASTICSEARCH"
+    "CONSOLESink",
+    "ELASTICSEARCHSink"
   ]
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json
index 3e1f7a6..de347c7 100644
--- a/measure/src/test/resources/env-batch.json
+++ b/measure/src/test/resources/env-batch.json
@@ -5,15 +5,14 @@
       "spark.master": "local[*]"
     }
   },
-
   "sinks": [
     {
+      "name": "consoleSink",
       "type": "console",
       "config": {
         "max.log.lines": 10
       }
     }
   ],
-
   "griffin.checkpoint": []
 }
\ No newline at end of file
diff --git a/measure/src/test/resources/env-streaming-mongo.json b/measure/src/test/resources/env-streaming-mongo.json
index b06d611..16d2f13 100644
--- a/measure/src/test/resources/env-streaming-mongo.json
+++ b/measure/src/test/resources/env-streaming-mongo.json
@@ -20,13 +20,14 @@
 
   "sinks": [
     {
+      "name": "consoleSink",
       "type": "console",
       "config": {
         "max.log.lines": 100
       }
     },
     {
-      "type": "mongo",
+      "type": "mongoSink",
       "config": {
         "url": "10.149.247.156",
         "database": "test",
diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json
index 34d469f..46b6660 100644
--- a/measure/src/test/resources/env-streaming.json
+++ b/measure/src/test/resources/env-streaming.json
@@ -20,7 +20,7 @@
 
   "sinks": [
     {
-      "type": "console",
+      "type": "consoleSink",
       "config": {
         "max.log.lines": 100
       }
diff --git a/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json b/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
index be6435d..550b4ac 100644
--- a/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
+++ b/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
@@ -16,7 +16,7 @@
     }
   ],
   "sinks": [
-    "CONSOLE"
+    "CONSOLESink"
   ],
   "name": "test_griffin_complete",
   "evaluate.rule": {
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
index b881dff..efaa91f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -162,6 +162,7 @@ class ParamEnumReaderSpec extends FlatSpec with Matchers {
 
   "sinktype" should "be valid" in {
     import org.mockito.Mockito._
+
     import org.apache.griffin.measure.configuration.enums.SinkType._
     var dqConfig = DQConfig(
       "test",
@@ -184,10 +185,10 @@ class ParamEnumReaderSpec extends FlatSpec with Matchers {
     dqConfig =
       DQConfig("test", 1234, "", Nil, mock(classOf[EvaluateRuleParam]), List("Consol", "Logg"))
     dqConfig.getValidSinkTypes should not be Seq(Console)
-    dqConfig.getValidSinkTypes should be(Seq(ElasticSearch))
+    dqConfig.getValidSinkTypes should be(Seq())
 
     dqConfig = DQConfig("test", 1234, "", Nil, mock(classOf[EvaluateRuleParam]), List(""))
-    dqConfig.getValidSinkTypes should be(Seq(ElasticSearch))
+    dqConfig.getValidSinkTypes should be(Nil)
   }
 
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
index f95f349..3d0aa0e 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSink.scala
@@ -21,42 +21,39 @@ import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 /**
  * sink records and metrics in memory for test.
  *
  * @param config sink configurations
- * @param metricName
+ * @param jobName
  * @param timeStamp
  * @param block
  */
-case class CustomSink(
-    config: Map[String, Any],
-    metricName: String,
-    timeStamp: Long,
-    block: Boolean)
+case class CustomSink(config: Map[String, Any], jobName: String, timeStamp: Long, block: Boolean)
     extends Sink {
-  def available(): Boolean = true
-
-  def start(msg: String): Unit = {}
-
-  def finish(): Unit = {}
+  def validate(): Boolean = true
 
   def log(rt: Long, msg: String): Unit = {}
 
   val allRecords: ListBuffer[String] = mutable.ListBuffer[String]()
 
-  def sinkRecords(records: RDD[String], name: String): Unit = {
+  override def sinkRecords(records: RDD[String], name: String): Unit = {
     allRecords ++= records.collect()
   }
 
-  def sinkRecords(records: Iterable[String], name: String): Unit = {
+  override def sinkRecords(records: Iterable[String], name: String): Unit = {
     allRecords ++= records
   }
 
   val allMetrics: mutable.Map[String, Any] = mutable.Map[String, Any]()
 
-  def sinkMetrics(metrics: Map[String, Any]): Unit = {
+  override def sinkMetrics(metrics: Map[String, Any]): Unit = {
     allMetrics ++= metrics
   }
+
+  override def sinkBatchRecords(dataset: DataFrame, key: Option[String] = None): Unit = {
+    allRecords ++= dataset.toJSON.rdd.collect()
+  }
 }
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index 8675be9..e4754e0 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -26,10 +26,13 @@ import org.apache.griffin.measure.step.write.{MetricFlushStep, MetricWriteStep,
 class CustomSinkTest extends SinkTestBase {
 
   val sinkParam: SinkParam =
-    SinkParam("custom", Map("class" -> "org.apache.griffin.measure.sink.CustomSink"))
+    SinkParam(
+      "customSink",
+      "custom",
+      Map("class" -> "org.apache.griffin.measure.sink.CustomSink"))
   override var sinkParams = Seq(sinkParam)
 
-  def withCustomSink[A](func: MultiSinks => A): A = {
+  def withCustomSink[A](func: Iterable[Sink] => A): A = {
     val sinkFactory = SinkFactory(sinkParams, "Test Sink Factory")
     val timestamp = System.currentTimeMillis
     val sinks = sinkFactory.getSinks(timestamp, block = true)
@@ -38,9 +41,21 @@ class CustomSinkTest extends SinkTestBase {
 
   "custom sink" can "sink metrics" in {
     val actualMetrics = withCustomSink(sinks => {
-      sinks.sinkMetrics(Map("sum" -> 10))
-      sinks.sinkMetrics(Map("count" -> 5))
-      sinks.headSinkOpt match {
+      sinks.foreach { sink =>
+        try {
+          sink.sinkMetrics(Map("sum" -> 10))
+        } catch {
+          case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+        }
+      }
+      sinks.foreach { sink =>
+        try {
+          sink.sinkMetrics(Map("count" -> 5))
+        } catch {
+          case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+        }
+      }
+      sinks.headOption match {
         case Some(sink: CustomSink) => sink.allMetrics
         case _ => mutable.ListBuffer[String]()
       }
@@ -53,10 +68,22 @@ class CustomSinkTest extends SinkTestBase {
   "custom sink" can "sink records" in {
     val actualRecords = withCustomSink(sinks => {
       val rdd1 = createDataFrame(1 to 2)
-      sinks.sinkRecords(rdd1.toJSON.rdd, "test records")
+      sinks.foreach { sink =>
+        try {
+          sink.sinkRecords(rdd1.toJSON.rdd, "test records")
+        } catch {
+          case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
+        }
+      }
       val rdd2 = createDataFrame(2 to 4)
-      sinks.sinkRecords(rdd2.toJSON.rdd, "test records")
-      sinks.headSinkOpt match {
+      sinks.foreach { sink =>
+        try {
+          sink.sinkRecords(rdd2.toJSON.rdd, "test records")
+        } catch {
+          case e: Throwable => error(s"sink records error: ${e.getMessage}", e)
+        }
+      }
+      sinks.headOption match {
         case Some(sink: CustomSink) => sink.allRecords
         case _ =>
       }
@@ -84,7 +111,7 @@ class CustomSinkTest extends SinkTestBase {
     val dQContext = getDqContext()
     RecordWriteStep(rwName, resultTable).execute(dQContext)
 
-    val actualRecords = dQContext.getSink.asInstanceOf[MultiSinks].headSinkOpt match {
+    val actualRecords = dQContext.getSinks.headOption match {
       case Some(sink: CustomSink) => sink.allRecords
       case _ => mutable.ListBuffer[String]()
     }
@@ -122,7 +149,7 @@ class CustomSinkTest extends SinkTestBase {
 
     metricWriteStep.execute(dQContext)
     MetricFlushStep().execute(dQContext)
-    val actualMetrics = dQContext.getSink.asInstanceOf[MultiSinks].headSinkOpt match {
+    val actualMetrics = dQContext.getSinks.headOption match {
       case Some(sink: CustomSink) => sink.allMetrics
       case _ => mutable.Map[String, Any]()
     }