You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by ch...@apache.org on 2021/03/09 09:03:23 UTC

[griffin] branch master updated: [GRIFFIN-345] Support cross-version compilation for Scala and Spark dependencies

This is an automated email from the ASF dual-hosted git repository.

chitralverma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 076c8d0  [GRIFFIN-345] Support cross-version compilation for Scala and Spark dependencies
076c8d0 is described below

commit 076c8d08abf9f825dde22213e35e1b24f50cb1f2
Author: chitralverma <ch...@gmail.com>
AuthorDate: Tue Mar 9 14:33:02 2021 +0530

    [GRIFFIN-345] Support cross-version compilation for Scala and Spark dependencies
    
    **What changes were proposed in this pull request?**
    
    _This PR affects only the measure module._
    
    In newer environments specially clouds, Griffin measure module may face compatibility issues due the old Scala and Spark versions. To remedy this following topics are covered in this ticket,
    
    - Cross-compilation across scala major versions (2.11, 2.12)
    - Update Spark Version (2.4+)
    - Create maven profiles to build different scala and spark versions
    - Changes to build strategy
    
    This process is also done is apache spark to build for different versions of Scala and Hadoop.
    
    **Does this PR introduce any user-facing change?**
    No
    
    **How was this patch tested?**
    Via maven build process.
    
    Closes #589 from chitralverma/cross-version-build.
    
    Authored-by: chitralverma <ch...@gmail.com>
    Signed-off-by: chitralverma <ch...@gmail.com>
---
 .gitignore                                         |   3 +-
 griffin-doc/deploy/measure-build-guide.md          |  74 +++
 measure/assembly.xml                               |  64 ---
 measure/pom.xml                                    | 527 +++++++++++++++------
 .../datasource/cache/StreamingCacheClient.scala    |  17 +-
 .../connector/DataConnectorFactory.scala           |  11 +-
 .../connector/batch/CassandraDataConnector.scala   |  53 ++-
 .../batch/ElasticSearchGriffinDataConnector.scala  | 105 ++--
 .../connector/batch/FileBasedDataConnector.scala   |  14 +-
 .../connector/batch/JDBCBasedDataConnector.scala   |  41 +-
 .../connector/batch/MySqlDataConnector.scala       |  31 +-
 .../streaming/KafkaStreamingDataConnector.scala    |  35 +-
 .../dsl/transform/analyzer/BasicAnalyzer.scala     |   2 +-
 .../measure/step/write/MetricFlushStep.scala       |  25 +-
 .../org/apache/griffin/measure/utils/FSUtil.scala  |  26 +-
 .../apache/griffin/measure/utils/ThreadUtils.scala |  14 +-
 .../apache/griffin/measure/SparkSuiteBase.scala    |   5 +-
 .../dqdefinition/reader/ParamEnumReaderSpec.scala  |   6 +-
 .../dqdefinition/reader/ParamFileReaderSpec.scala  |   6 +-
 .../dqdefinition/reader/ParamJsonReaderSpec.scala  |   7 +-
 .../measure/context/DataFrameCacheTest.scala       |   5 +-
 .../measure/context/MetricWrapperTest.scala        |   5 +-
 .../griffin/measure/context/TimeRangeTest.scala    |   5 +-
 .../measure/datasource/TimestampStorageTest.scala  |   5 +-
 .../connector/DataConnectorFactorySpec.scala       |   4 +-
 .../batch/ElasticSearchDataConnectorTest.scala     |   4 +-
 .../batch/FileBasedDataConnectorTest.scala         |  15 +-
 .../batch/JDBCBasedDataConnectorTest.scala         |   2 +-
 .../org/apache/griffin/measure/job/DQAppTest.scala |   9 +-
 .../griffin/measure/sink/CustomSinkTest.scala      |   2 +-
 .../apache/griffin/measure/sink/SinkTestBase.scala |   5 +-
 .../griffin/measure/step/TransformStepTest.scala   |   6 +-
 .../transform/CompletenessExpr2DQStepsTest.scala   |  10 +-
 .../AccuracyTransformationsIntegrationTest.scala   |  15 +-
 .../griffin/measure/utils/ParamUtilTest.scala      |   6 +-
 pom.xml                                            |  49 +-
 service/pom.xml                                    |   3 +-
 37 files changed, 752 insertions(+), 464 deletions(-)

diff --git a/.gitignore b/.gitignore
index cd6b316..ef3e99e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ target/**
 *.jar
 *.war
 *.ear
+*.cfg
 target
 service/src/main/resources/public/
 ui/.tmp/
@@ -21,7 +22,7 @@ ui/.tmp/
 .settings/
 .classpath
 bin
-
+**/site-packages/**
 # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
 # don't need binary 
diff --git a/griffin-doc/deploy/measure-build-guide.md b/griffin-doc/deploy/measure-build-guide.md
new file mode 100644
index 0000000..8914bc6
--- /dev/null
+++ b/griffin-doc/deploy/measure-build-guide.md
@@ -0,0 +1,74 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Griffin Build Guide - Measure Module
+
+Like other modules of Apache Griffin, `measure` module is also built using Maven build tool. Building `measure` module
+requires Maven version 3.5+ and Java 8.
+
+## Version Compatibility
+
+Starting from Apache Griffin 0.7, the `measure` module will be (scala-spark) cross version compatible. Since both Scala
+and Spark are dependencies for Apache Griffin, details of Spark-Scala cross version compatibility is mentioned below,
+
+|           | Spark 2.3.x | Spark 2.4.x | Spark 3.0.x |
+| --------- |:-----------:|:-----------:|:-----------:|
+| Scala 2.11| ✓           | ✓           | x           |
+| Scala 2.12| x           | ✓           | ✓           |
+
+## Building a Distribution
+
+Execute the below commands to build `measure` with desired version of Spark and Scala,
+
+By default, the build is compiled with Scala 2.11 and Apache Spark 2.4.x.
+
+```
+# For measure module with Scala 2.11 and Spark 2.4.x
+mvn clean package
+```
+
+To change Scala or Spark version you can use the commands below,
+
+```
+# For measure module with Scala 2.12 and Spark 2.4.x
+mvn clean package -Dscala-2.12
+```
+
+```
+# For measure module with Scala 2.11 and Spark 2.3.x
+mvn clean package -Dspark-2.3
+```
+
+```
+# For measure module with Scala 2.12 and Spark 3.0.x
+mvn clean package -Dscala-2.12 -Dspark-3.0 
+```
+
+Note: 
+ - Using `-Dscala-2.12` and `-Dspark-2.3` option together will cause build failure due to missing dependencies as it
+is not cross compiled, see details [here](#version-compatibility)
+ - Using `-Dspark-3.0` option without `-Dscala-2.12` option will cause build failure due to missing dependencies as it
+   is not cross compiled, see details [here](#version-compatibility)
+
+### AVRO Source Support
+
+Starting Spark 2.4.x, AVRO source (`spark-avro` package) was migrated from `com.databricks` group to `org.apache.spark`.
+Additionally, the older dependency does not support scala 2.12.
+
+All builds of `measure` module will contain AVRO source support by default.
diff --git a/measure/assembly.xml b/measure/assembly.xml
deleted file mode 100644
index 7890497..0000000
--- a/measure/assembly.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-  <id>package</id>
-  <formats>
-    <format>tar.gz</format>
-  </formats>
-  <fileSets>
-    <fileSet>
-      <directory>${project.basedir}/sbin</directory>
-      <outputDirectory>/bin</outputDirectory>
-      <includes>
-        <include>/**</include>
-      </includes>
-      <lineEnding>unix</lineEnding>
-      <fileMode>0777</fileMode>
-      <directoryMode>0755</directoryMode>
-    </fileSet>
-    <fileSet>
-      <directory>${project.basedir}/src/main/resources</directory>
-      <outputDirectory>/conf</outputDirectory>
-      <includes>
-        <include>/**</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-
-  <dependencySets>
-    <dependencySet>
-      <useProjectArtifact>true</useProjectArtifact>
-      <outputDirectory>/lib</outputDirectory>
-      <unpack>false</unpack>
-      <scope>provided</scope>
-      <excludes>
-        <exclude>com.twitter:parquet-hadoop-bundle</exclude>
-        <exclude>io.dropwizard.metrics:*</exclude>
-        <exclude>org.glassfish.jersey.core:*</exclude>
-        <exclude>org.glassfish.jersey.containers:*</exclude>
-        <exclude>org.apache.thrift:*</exclude>
-        <exclude>org.apache.parquet:*</exclude>
-        <exclude>org.apache.hadoop:*</exclude>
-        <exclude>org.apache.spark:*</exclude>
-      </excludes>
-    </dependencySet>
-  </dependencySets>
-</assembly>
diff --git a/measure/pom.xml b/measure/pom.xml
index 5408d37..3cd8bc6 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -17,7 +17,8 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -29,290 +30,308 @@ under the License.
     <artifactId>measure</artifactId>
     <packaging>jar</packaging>
 
-    <name>Apache Griffin :: Measures</name>
-    <url>http://maven.apache.org</url>
+    <name>Apache Griffin :: Measure</name>
+    <url>http://griffin.apache.org</url>
 
     <properties>
-        <scala.version>2.11.8</scala.version>
-        <spark.version>2.2.1</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <avro.version>1.7.7</avro.version>
-        <jackson.version>2.8.7</jackson.version>
+
+        <!-- Spark -->
+        <spark.major.version>2.4</spark.major.version>
+        <spark.version>${spark.major.version}.4</spark.version>
+        <spark.scope>${provided.scope}</spark.scope>
+        <spark-streaming-kafka.version>${spark.version}</spark-streaming-kafka.version>
+
+        <!-- Code Standardization -->
+        <scalastyle.version>1.0.0</scalastyle.version>
+        <scalafmt.version>1.0.3</scalafmt.version>
+
+        <!-- Data Connectors -->
+        <mysql.java.version>5.1.47</mysql.java.version>
+        <mariadb.version>2.7.0</mariadb.version>
+        <cassandra.connector.version>2.5.1</cassandra.connector.version>
+        <elasticsearch.version>6.4.1</elasticsearch.version>
         <scalaj.version>2.3.0</scalaj.version>
         <mongo.version>2.1.0</mongo.version>
-        <scalatest.version>3.0.0</scalatest.version>
-        <slf4j.version>1.7.21</slf4j.version>
-        <log4j.version>1.2.16</log4j.version>
         <curator.version>2.10.0</curator.version>
-        <mockito.version>1.10.19</mockito.version>
-        <mysql.java.version>5.1.47</mysql.java.version>
-        <cassandra.connector.version>2.4.1</cassandra.connector.version>
-        <scalastyle.version>1.0.0</scalastyle.version>
-        <scalafmt.parameters>--diff --test</scalafmt.parameters>
-        <scalafmt.skip>false</scalafmt.skip>
-        <elasticsearch.version>6.4.1</elasticsearch.version>
-        <spark.scope>provided</spark.scope>
+
+        <!-- Testing -->
+        <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
+        <mockito.version>3.2.2.0</mockito.version>
+        <scalatest.version>3.2.3</scalatest.version>
     </properties>
 
     <dependencies>
-        <!--scala-->
+
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>${scala.version}</version>
         </dependency>
 
-        <!--spark, spark streaming, spark hive-->
+        <!-- Spark Dependencies -->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
         </dependency>
+
+        <!-- Data Connectors -->
+
+        <!-- Spark Hive-->
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>${spark.scope}</scope>
+        </dependency>
+
+        <!-- Spark Kafka-->
+        <!-- TODO
+        Spark Streaming Kafka 08 is not cross compiled with scala 2.12.
+        Thus, this dependency has been temporary un-banned in the enforcer plugin.
+        This will be removed with structured streaming implementation
+        -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka-0-8_${scala211.binary.version}</artifactId>
+            <version>${spark-streaming-kafka.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>commons-httpclient</groupId>
-                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>org.scala-lang.modules</groupId>
+                    <artifactId>scala-xml_2.11</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpclient</artifactId>
+                    <groupId>org.scala-lang.modules</groupId>
+                    <artifactId>scala-parser-combinators_2.11</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-tags_2.11</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
 
-        <!--jackson-->
+        <!-- MongoDB-->
         <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-            <version>${jackson.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.module</groupId>
-            <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-            <version>${jackson.version}</version>
+            <groupId>org.mongodb.scala</groupId>
+            <artifactId>mongo-scala-driver_${scala.binary.version}</artifactId>
+            <version>${mongo.version}</version>
         </dependency>
 
-        <!--scalaj for http request-->
+        <!-- MySql -->
         <dependency>
-            <groupId>org.scalaj</groupId>
-            <artifactId>scalaj-http_${scala.binary.version}</artifactId>
-            <version>${scalaj.version}</version>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>${mysql.java.version}</version>
         </dependency>
 
-        <!--mongo request-->
+        <!-- MariaDB -->
         <dependency>
-            <groupId>org.mongodb.scala</groupId>
-            <artifactId>mongo-scala-driver_2.11</artifactId>
-            <version>${mongo.version}</version>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+            <version>${mariadb.version}</version>
         </dependency>
 
-        <!--avro-->
+        <!-- Spark Cassandra-->
         <dependency>
-            <groupId>com.databricks</groupId>
-            <artifactId>spark-avro_${scala.binary.version}</artifactId>
-            <version>4.0.0</version>
+            <groupId>com.datastax.spark</groupId>
+            <artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
+            <version>${cassandra.connector.version}</version>
         </dependency>
 
-        <!--log4j-->
+        <!-- Spark Elasticsearch-->
+        <!-- TODO
+        Elasticsearch Spark has recently added support for scala 2.12 but it is not published in the Maven Central yet.
+        Thus, this dependency has been temporary un-banned in the enforcer plugin.
+        See https://github.com/elastic/elasticsearch-hadoop/pull/1589#issuecomment-776920189
+        -->
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <version>${slf4j.version}</version>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch-spark-20_${scala211.binary.version}</artifactId>
+            <version>${elasticsearch.version}</version>
         </dependency>
 
+        <!-- Miscellaneous-->
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j.version}</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
         </dependency>
+
         <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>${log4j.version}</version>
+            <groupId>org.scalaj</groupId>
+            <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+            <version>${scalaj.version}</version>
         </dependency>
 
-        <!--curator-->
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.9</version>
         </dependency>
 
-        <!--junit-->
+        <!-- Test -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
+            <scope>test</scope>
         </dependency>
 
-        <!--scala test-->
         <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_${scala.binary.version}</artifactId>
             <version>${scalatest.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-            <version>${mockito.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <groupId>org.scalatestplus</groupId>
+            <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-            <version>${mysql.java.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.datastax.spark</groupId>
-            <artifactId>spark-cassandra-connector_2.11</artifactId>
-            <version>${cassandra.connector.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>4.5.9</version>
-        </dependency>
-        <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
             <version>1.4.200</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch</groupId>
-            <artifactId>elasticsearch-spark-20_2.11</artifactId>
-            <version>${elasticsearch.version}</version>
-        </dependency>
+
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>1.14.3</version>
             <scope>test</scope>
         </dependency>
+
     </dependencies>
 
     <build>
         <sourceDirectory>src/main/scala</sourceDirectory>
         <testSourceDirectory>src/test/scala</testSourceDirectory>
         <plugins>
+
             <plugin>
                 <groupId>net.alchim31.maven</groupId>
                 <artifactId>scala-maven-plugin</artifactId>
-                <version>3.3.1</version>
+                <version>4.3.0</version>
                 <executions>
                     <execution>
-                        <id>compile</id>
                         <goals>
                             <goal>compile</goal>
                             <goal>testCompile</goal>
                         </goals>
-                        <phase>compile</phase>
                     </execution>
                 </executions>
                 <configuration>
                     <scalaVersion>${scala.version}</scalaVersion>
+                    <checkMultipleScalaVersions>true</checkMultipleScalaVersions>
+                    <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
+                    <recompileMode>incremental</recompileMode>
                     <args>
+                        <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
                         <arg>-feature</arg>
-                        <arg>-unchecked</arg>
+                        <arg>-explaintypes</arg>
                     </args>
+                    <jvmArgs>
+                        <jvmArg>-Xms64m</jvmArg>
+                        <jvmArg>-Xmx1024m</jvmArg>
+                    </jvmArgs>
+                    <javacArgs>
+                        <javacArg>-source</javacArg>
+                        <javacArg>-target</javacArg>
+                        <javacArg>${java.version}</javacArg>
+                        <javacArg>-Xlint:all,-serial,-path,-try</javacArg>
+                    </javacArgs>
                 </configuration>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-            </plugin>
+
             <plugin>
                 <groupId>org.scalatest</groupId>
                 <artifactId>scalatest-maven-plugin</artifactId>
+                <version>2.0.2</version>
+                <configuration>
+                    <testFailureIgnore>false</testFailureIgnore>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>TestSuiteReport.txt</filereports>
+                    <stderr/>
+                    <systemProperties>
+                        <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+                    </systemProperties>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
             </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
-                <version>3.1.0</version>
-                <configuration>
-                    <createDependencyReducedPom>false</createDependencyReducedPom>
-                </configuration>
+                <version>3.2.4</version>
                 <executions>
                     <execution>
                         <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
                         </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>org.apache.griffin.measure.Application</mainClass>
-                                </transformer>
-                            </transformers>
-                            <relocations>
-                                <relocation>
-                                    <pattern>org.apache.http</pattern>
-                                    <shadedPattern>griffin.org.apache.http</shadedPattern>
-                                </relocation>
-                            </relocations>
-                            <filters>
-                                <filter>
-                                    <artifact>*:*</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/maven/**</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
-                        </configuration>
                     </execution>
                 </executions>
+                <configuration>
+                    <createDependencyReducedPom>false</createDependencyReducedPom>
+                    <minimizeJar>true</minimizeJar>
+                    <relocations>
+                        <relocation>
+                            <pattern>org.apache.http</pattern>
+                            <shadedPattern>griffin.org.apache.http</shadedPattern>
+                        </relocation>
+                    </relocations>
+                </configuration>
             </plugin>
+
             <plugin>
                 <groupId>org.antipathy</groupId>
                 <artifactId>mvn-scalafmt_${scala.binary.version}</artifactId>
-                <version>0.12_1.5.1</version>
+                <version>${scalafmt.version}</version>
                 <configuration>
-                    <parameters>${scalafmt.parameters}</parameters>
-                    <skip>${scalafmt.skip}</skip>
-                    <skipSources>${scalafmt.skip}</skipSources>
-                    <skipTestSources>${scalafmt.skip}</skipTestSources>
                     <configLocation>${project.parent.basedir}/.scalafmt.conf</configLocation>
+                    <skipTestSources>false</skipTestSources>
+                    <skipSources>false</skipSources>
+                    <respectVersion>false</respectVersion>
+                    <validateOnly>false</validateOnly>
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>validate</phase>
+                        <phase>compile</phase>
                         <goals>
                             <goal>format</goal>
                         </goals>
                     </execution>
                 </executions>
             </plugin>
+
             <plugin>
                 <groupId>org.scalastyle</groupId>
                 <artifactId>scalastyle-maven-plugin</artifactId>
@@ -331,21 +350,81 @@ under the License.
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>validate</phase>
+                        <phase>compile</phase>
                         <goals>
                             <goal>check</goal>
                         </goals>
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+                <version>0.8.2</version>
+                <executions>
+                    <execution>
+                        <id>default-prepare-agent</id>
+                        <goals>
+                            <goal>prepare-agent</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>report</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>report</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scoverage</groupId>
+                <artifactId>scoverage-maven-plugin</artifactId>
+                <version>${scoverage.plugin.version}</version>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <aggregate>true</aggregate>
+                    <highlighting>true</highlighting>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>3.2.0</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${project.artifactId}-${project.version}</finalName>
+                            <classifier>spark-${spark.major.version}_scala-${scala.binary.version}</classifier>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.3.0</version>
                 <configuration>
-                    <descriptors>
-                        <descriptor>assembly.xml</descriptor>
-                    </descriptors>
-                    <finalName>${artifactId}-${project.version}</finalName>
+                    <tarLongFileMode>posix</tarLongFileMode>
+                    <finalName>
+                        ${project.artifactId}-${project.version}-spark-${spark.major.version}_scala-${scala.binary.version}
+                    </finalName>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.griffin.measure.Application</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
                 </configuration>
                 <executions>
                     <execution>
@@ -357,6 +436,172 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>3.0.0-M2</version>
+                <executions>
+                    <execution>
+                        <id>enforce-versions</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <requireJavaVersion>
+                                    <version>${java.version}</version>
+                                </requireJavaVersion>
+                                <bannedDependencies>
+                                    <excludes>
+                                        <exclude>*:*_2.10</exclude>
+                                        <exclude>*:*_2.12</exclude>
+                                        <exclude>*:*_2.13</exclude>
+                                    </excludes>
+                                    <searchTransitive>true</searchTransitive>
+                                </bannedDependencies>
+                            </rules>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>enforce-no-duplicate-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                        <configuration>
+                            <rules>
+                                <banDuplicatePomDependencyVersions/>
+                            </rules>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <!-- Scala 2.12 -->
+        <profile>
+            <id>scala-2.12</id>
+            <activation>
+                <property>
+                    <name>scala-2.12</name>
+                </property>
+            </activation>
+
+            <properties>
+                <scala.binary.version>2.12</scala.binary.version>
+                <scala.version>${scala.binary.version}.12</scala.version>
+            </properties>
+
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-enforcer-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>enforce-versions</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <requireJavaVersion>
+                                            <version>${java.version}</version>
+                                        </requireJavaVersion>
+                                        <bannedDependencies>
+                                            <!-- TODO Temporary Fix: These will be removed in a future iteration. -->
+                                            <includes>
+                                                <include>org.elasticsearch:elasticsearch-spark-20_2.11:*</include>
+                                                <include>org.apache.spark:spark-streaming-kafka-0-8_2.11:*</include>
+                                                <include>org.apache.kafka:kafka_2.11:*</include>
+                                            </includes>
+                                            <excludes>
+                                                <exclude>*:*_2.10</exclude>
+                                                <exclude>*:*_2.11</exclude>
+                                                <exclude>*:*_2.13</exclude>
+                                            </excludes>
+                                            <searchTransitive>true</searchTransitive>
+                                        </bannedDependencies>
+                                    </rules>
+                                </configuration>
+                            </execution>
+                            <execution>
+                                <id>enforce-no-duplicate-dependencies</id>
+                                <goals>
+                                    <goal>enforce</goal>
+                                </goals>
+                                <configuration>
+                                    <rules>
+                                        <banDuplicatePomDependencyVersions/>
+                                    </rules>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+
+        <!-- Spark 2.3.x -->
+        <profile>
+            <id>spark-2.3</id>
+            <activation>
+                <property>
+                    <name>spark-2.3</name>
+                </property>
+            </activation>
+
+            <properties>
+                <spark.major.version>2.3</spark.major.version>
+                <spark.version>${spark.major.version}.4</spark.version>
+            </properties>
+
+            <dependencies>
+                <!-- Old Spark Avro-->
+                <dependency>
+                    <groupId>com.databricks</groupId>
+                    <artifactId>spark-avro_${scala211.binary.version}</artifactId>
+                    <version>4.0.0</version>
+                </dependency>
+            </dependencies>
+        </profile>
+
+        <!-- Spark 3.0.x -->
+        <profile>
+            <id>spark-3.0</id>
+            <activation>
+                <property>
+                    <name>spark-3.0</name>
+                </property>
+            </activation>
+
+            <properties>
+                <spark.major.version>3.0</spark.major.version>
+                <spark.version>${spark.major.version}.2</spark.version>
+                <spark-streaming-kafka.version>2.4.7</spark-streaming-kafka.version>
+            </properties>
+        </profile>
+
+        <!-- Spark 2.4+ Avro -->
+        <profile>
+            <id>spark-avro-2.4</id>
+            <activation>
+                <property>
+                    <name>!spark-2.3</name>
+                </property>
+            </activation>
+
+            <dependencies>
+                <!-- New Spark Avro-->
+                <dependency>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-avro_${scala.binary.version}</artifactId>
+                    <version>${spark.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+
+    </profiles>
 </project>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index 04775c7..643ff5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -184,14 +184,15 @@ trait StreamingCacheClient
     }
 
     // new cache data
-    val newDfOpt = try {
-      val dfr = sparkSession.read
-      readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
-    } catch {
-      case e: Throwable =>
-        warn(s"read data source cache warn: ${e.getMessage}")
-        None
-    }
+    val newDfOpt =
+      try {
+        val dfr = sparkSession.read
+        readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
+      } catch {
+        case e: Throwable =>
+          warn(s"read data source cache warn: ${e.getMessage}")
+          None
+      }
 
     // old cache data
     val oldCacheIndexOpt = if (updatable) readOldCacheIndex() else None
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 39827cf..0b08dd4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -32,8 +32,15 @@ import org.apache.griffin.measure.datasource.connector.streaming._
 
 object DataConnectorFactory extends Loggable {
 
-  @deprecated val AvroRegex: Regex = """^(?i)avro$""".r
-  @deprecated val TextDirRegex: Regex = """^(?i)text-dir$""".r
+  @deprecated(
+    s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+    "0.6.0")
+  val AvroRegex: Regex = """^(?i)avro$""".r
+
+  @deprecated(
+    s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+    "0.6.0")
+  val TextDirRegex: Regex = """^(?i)text-dir$""".r
 
   val HiveRegex: Regex = """^(?i)hive$""".r
   val FileRegex: Regex = """^(?i)file$""".r
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
index d135f3b..2fd2912 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
@@ -52,33 +52,34 @@ case class CassandraDataConnector(
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
 
-    val dfOpt = try {
-      sparkSession.conf.set("spark.cassandra.connection.host", host)
-      sparkSession.conf.set("spark.cassandra.connection.port", port)
-      sparkSession.conf.set("spark.cassandra.auth.username", user)
-      sparkSession.conf.set("spark.cassandra.auth.password", password)
-
-      val tableDef: DataFrameReader = sparkSession.read
-        .format("org.apache.spark.sql.cassandra")
-        .options(Map("table" -> tableName, "keyspace" -> database))
-
-      val dataWh: String = dataWhere()
-
-      var data: DataFrame = null
-      if (wheres.length > 0) {
-        data = tableDef.load().where(dataWh)
-      } else {
-        data = tableDef.load()
+    val dfOpt =
+      try {
+        sparkSession.conf.set("spark.cassandra.connection.host", host)
+        sparkSession.conf.set("spark.cassandra.connection.port", port)
+        sparkSession.conf.set("spark.cassandra.auth.username", user)
+        sparkSession.conf.set("spark.cassandra.auth.password", password)
+
+        val tableDef: DataFrameReader = sparkSession.read
+          .format("org.apache.spark.sql.cassandra")
+          .options(Map("table" -> tableName, "keyspace" -> database))
+
+        val dataWh: String = dataWhere()
+
+        var data: DataFrame = null
+        if (wheres.length > 0) {
+          data = tableDef.load().where(dataWh)
+        } else {
+          data = tableDef.load()
+        }
+
+        val dfOpt = Some(data)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
+          None
       }
-
-      val dfOpt = Some(data)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
-        None
-    }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index dfdacc8..597695e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -77,23 +77,24 @@ case class ElasticSearchGriffinDataConnector(
   def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
     val path: String = s"/_sql?format=csv"
     info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
-    val dfOpt = try {
-      val answer = httpPost(path, sql)
-      if (answer._1) {
-        import sparkSession.implicits._
-        val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
-        val reader: DataFrameReader = sparkSession.read
-        reader.option("header", true).option("inferSchema", true)
-        val df: DataFrame = reader.csv(rdd.toDS())
-        val dfOpt = Some(df)
-        val preDfOpt = preProcess(dfOpt, ms)
-        preDfOpt
-      } else None
-    } catch {
-      case e: Throwable =>
-        error(s"load ES by sql $host:$port $sql  fails: ${e.getMessage}", e)
-        None
-    }
+    val dfOpt =
+      try {
+        val answer = httpPost(path, sql)
+        if (answer._1) {
+          import sparkSession.implicits._
+          val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+          val reader: DataFrameReader = sparkSession.read
+          reader.option("header", true).option("inferSchema", true)
+          val df: DataFrame = reader.csv(rdd.toDS())
+          val dfOpt = Some(df)
+          val preDfOpt = preProcess(dfOpt, ms)
+          preDfOpt
+        } else None
+      } catch {
+        case e: Throwable =>
+          error(s"load ES by sql $host:$port $sql  fails: ${e.getMessage}", e)
+          None
+      }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
@@ -102,44 +103,44 @@ case class ElasticSearchGriffinDataConnector(
     val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
     info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
 
-    val dfOpt = try {
-      val answer = httpGet(path)
-      val data = ArrayBuffer[Map[String, Number]]()
-
-      if (answer._1) {
-        val arrayAnswers: util.Iterator[JsonNode] =
-          parseString(answer._2).get("hits").get("hits").elements()
-
-        while (arrayAnswers.hasNext) {
-          val answer = arrayAnswers.next()
-          val values = answer.get("_source").get("value")
-          val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
-          val fieldsMap = mutable.Map[String, Number]()
-          while (fields.hasNext) {
-            val fld: util.Map.Entry[String, JsonNode] = fields.next()
-            fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+    val dfOpt =
+      try {
+        val answer = httpGet(path)
+        val data = ArrayBuffer[Map[String, Number]]()
+
+        if (answer._1) {
+          val arrayAnswers: util.Iterator[JsonNode] =
+            parseString(answer._2).get("hits").get("hits").elements()
+
+          while (arrayAnswers.hasNext) {
+            val answer = arrayAnswers.next()
+            val values = answer.get("_source").get("value")
+            val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
+            val fieldsMap = mutable.Map[String, Number]()
+            while (fields.hasNext) {
+              val fld: util.Map.Entry[String, JsonNode] = fields.next()
+              fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+            }
+            data += fieldsMap.toMap
           }
-          data += fieldsMap.toMap
         }
+        val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
+        val columns: Array[String] = fields.toArray
+        val defaultNumber: Number = 0.0
+        val rdd: RDD[Row] = rdd1
+          .map { x: Map[String, Number] =>
+            Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
+          }
+        val schema = dfSchema(columns.toList)
+        val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
+          None
       }
-      val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
-      val columns: Array[String] = fields.toArray
-      val defaultNumber: Number = 0.0
-      val rdd: RDD[Row] = rdd1
-        .map { x: Map[String, Number] =>
-          Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
-        }
-      val schema = dfSchema(columns.toList)
-      val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
-      df.show(20)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
-        None
-    }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
index 86049bd..2e4f482 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -79,7 +79,14 @@ case class FileBasedDataConnector(
     SupportedFormats.contains(format),
     s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
 
-  if (format == "csv") validateCSVOptions()
+  if (format.equalsIgnoreCase("avro") && sparkSession.version < "2.3.0") {
+    format = "com.databricks.spark.avro"
+  }
+
+  if (format == "csv") {
+    validateCSVOptions()
+  }
+
   if (format == "tsv") {
     format = "csv"
     options.getOrElseUpdate(Delimiter, TabDelimiter)
@@ -169,7 +176,8 @@ object FileBasedDataConnector extends Loggable {
   private val TabDelimiter: String = "\t"
 
   private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
-  private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
+  private val SupportedFormats: Seq[String] =
+    Seq("parquet", "orc", "avro", "text", "csv", "tsv", "com.databricks.spark.avro")
 
   /**
    * Validates the existence of paths in a given sequence.
@@ -189,7 +197,7 @@ object FileBasedDataConnector extends Loggable {
           else throw new IllegalArgumentException(msg)
 
           false
-      })
+        })
 
     assert(validPaths.nonEmpty, "No paths were given for the data source.")
     validPaths
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
index 9fc21d7..2f77db3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -69,27 +69,28 @@ case class JDBCBasedDataConnector(
   assert(isJDBCDriverLoaded(driver), s"JDBC driver $driver not present in classpath")
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val dtSql = createSqlStmt()
-      val prop = new java.util.Properties
-      prop.setProperty("user", user)
-      prop.setProperty("password", password)
-      prop.setProperty("driver", driver)
-      val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
+    val dfOpt =
+      try {
+        val dtSql = createSqlStmt()
+        val prop = new java.util.Properties
+        prop.setProperty("user", user)
+        prop.setProperty("password", password)
+        prop.setProperty("driver", driver)
+        val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
 
-      dfOpt match {
-        case Success(_) =>
-        case Failure(exception) =>
-          griffinLogger.error("Error occurred while reading data set.", exception)
-      }
+        dfOpt match {
+          case Success(_) =>
+          case Failure(exception) =>
+            griffinLogger.error("Error occurred while reading data set.", exception)
+        }
 
-      val preDfOpt = preProcess(dfOpt.toOption, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"loading table $fullTableName fails: ${e.getMessage}", e)
-        None
-    }
+        val preDfOpt = preProcess(dfOpt.toOption, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"loading table $fullTableName fails: ${e.getMessage}", e)
+          None
+      }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
@@ -99,7 +100,7 @@ case class JDBCBasedDataConnector(
    */
   private def createSqlStmt(): String = {
     val tableClause = s"SELECT * FROM $fullTableName"
-    if (whereString.length > 0) {
+    if (whereString.nonEmpty) {
       s"$tableClause WHERE $whereString"
     } else tableClause
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index feacfc9..26321fc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -52,21 +52,22 @@ case class MySqlDataConnector(
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
 
-    val dfOpt = try {
-      val dtSql = dataSql()
-      val prop = new java.util.Properties
-      prop.setProperty("user", user)
-      prop.setProperty("password", password)
-      prop.setProperty("driver", driver)
-      val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable =>
-        error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
-        None
-    }
+    val dfOpt =
+      try {
+        val dtSql = dataSql()
+        val prop = new java.util.Properties
+        prop.setProperty("user", user)
+        prop.setProperty("password", password)
+        prop.setProperty("driver", driver)
+        val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } catch {
+        case e: Throwable =>
+          error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
+          None
+      }
     val tmsts = readTmst(ms)
     (dfOpt, TimeRange(ms, tmsts))
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 0c501dc..b0be557 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -51,23 +51,24 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
     }
     ds.foreachRDD((rdd, time) => {
       val ms = time.milliseconds
-      val saveDfOpt = try {
-        // coalesce partition number
-        val prlCount = rdd.sparkContext.defaultParallelism
-        val ptnCount = rdd.getNumPartitions
-        val repartitionedRdd = if (prlCount < ptnCount) {
-          rdd.coalesce(prlCount)
-        } else rdd
-
-        val dfOpt = transform(repartitionedRdd)
-
-        // pre-process
-        preProcess(dfOpt, ms)
-      } catch {
-        case e: Throwable =>
-          error(s"streaming data connector error: ${e.getMessage}")
-          None
-      }
+      val saveDfOpt =
+        try {
+          // coalesce partition number
+          val prlCount = rdd.sparkContext.defaultParallelism
+          val ptnCount = rdd.getNumPartitions
+          val repartitionedRdd = if (prlCount < ptnCount) {
+            rdd.coalesce(prlCount)
+          } else rdd
+
+          val dfOpt = transform(repartitionedRdd)
+
+          // pre-process
+          preProcess(dfOpt, ms)
+        } catch {
+          case e: Throwable =>
+            error(s"streaming data connector error: ${e.getMessage}")
+            None
+        }
 
       // save data frame
       streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
index 743bba9..9aa27f7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
@@ -43,7 +43,7 @@ trait BasicAnalyzer extends Serializable {
             v :+ se
           case _ => v
         }
-    }
+      }
   val combSelectionExprs: (Seq[SelectionExpr], Seq[SelectionExpr]) => Seq[SelectionExpr] =
     (a: Seq[SelectionExpr], b: Seq[SelectionExpr]) => a ++ b
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 4f4bf99..980bded 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -33,20 +33,21 @@ case class MetricFlushStep() extends WriteStep {
   def execute(context: DQContext): Try[Boolean] = Try {
     context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
       val (t, metric) = pair
-      val pr = try {
-        context.getSinks(t).foreach { sink =>
-          try {
-            sink.sinkMetrics(metric)
-          } catch {
-            case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+      val pr =
+        try {
+          context.getSinks(t).foreach { sink =>
+            try {
+              sink.sinkMetrics(metric)
+            } catch {
+              case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+            }
           }
+          true
+        } catch {
+          case e: Throwable =>
+            error(s"flush metrics error: ${e.getMessage}", e)
+            false
         }
-        true
-      } catch {
-        case e: Throwable =>
-          error(s"flush metrics error: ${e.getMessage}", e)
-          false
-      }
       ret && pr
     }
   }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
index b7f50e8..680f7a8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -38,13 +38,14 @@ object FSUtil extends Loggable {
         fsMap.get(uri.getScheme) match {
           case Some(fs) => fs
           case _ =>
-            val fs = try {
-              FileSystem.get(uri, getConfiguration)
-            } catch {
-              case e: Throwable =>
-                error(s"get file system error: ${e.getMessage}", e)
-                throw e
-            }
+            val fs =
+              try {
+                FileSystem.get(uri, getConfiguration)
+              } catch {
+                case e: Throwable =>
+                  error(s"get file system error: ${e.getMessage}", e)
+                  throw e
+              }
             fsMap += (uri.getScheme -> fs)
             fs
         }
@@ -60,11 +61,12 @@ object FSUtil extends Loggable {
   }
 
   private def getUriOpt(path: String): Option[URI] = {
-    val uriOpt = try {
-      Some(new URI(path))
-    } catch {
-      case _: Throwable => None
-    }
+    val uriOpt =
+      try {
+        Some(new URI(path))
+      } catch {
+        case _: Throwable => None
+      }
     uriOpt.flatMap { uri =>
       if (uri.getScheme == null) {
         try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
index 52c5f38..bc1e292 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -21,10 +21,6 @@ import java.util.concurrent._
 
 import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
 import scala.concurrent.duration.Duration
-import scala.concurrent.forkjoin.{
-  ForkJoinPool => SForkJoinPool,
-  ForkJoinWorkerThread => SForkJoinWorkerThread
-}
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
@@ -172,15 +168,15 @@ private[griffin] object ThreadUtils {
   /**
    * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
    */
-  def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+  def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
     // Custom factory to set thread names
-    val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
-      override def newThread(pool: SForkJoinPool): SForkJoinWorkerThread =
-        new SForkJoinWorkerThread(pool) {
+    val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
+      override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
+        new ForkJoinWorkerThread(pool) {
           setName(prefix + "-" + super.getName)
         }
     }
-    new SForkJoinPool(
+    new ForkJoinPool(
       maxThreadNumber,
       factory,
       null, // handler
diff --git a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
index dbed89c..011122f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -22,9 +22,10 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SparkSession
-import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
 
-trait SparkSuiteBase extends FlatSpec with BeforeAndAfterAll {
+trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll {
 
   @transient var spark: SparkSession = _
   @transient var sc: SparkContext = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
index efaa91f..8f191bc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -17,7 +17,9 @@
 
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
 
 import org.apache.griffin.measure.configuration.dqdefinition.{
   DQConfig,
@@ -26,7 +28,7 @@ import org.apache.griffin.measure.configuration.dqdefinition.{
   RuleParam
 }
 
-class ParamEnumReaderSpec extends FlatSpec with Matchers {
+class ParamEnumReaderSpec extends AnyFlatSpec with Matchers {
   import org.apache.griffin.measure.configuration.enums.DslType._
   "dsltype" should "be parsed to predefined set of values" in {
     val validDslSparkSqlValues =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index a05df90..94c7e07 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -22,8 +22,10 @@ import scala.util.{Failure, Success}
 
 import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
 import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-
-class ParamFileReaderSpec extends FlatSpec with Matchers {
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
+class ParamFileReaderSpec extends AnyFlatSpec with Matchers {
 
   "params " should "be parsed from a valid file" in {
     val reader: ParamReader =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 86d68b5..d8bf125 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -18,14 +18,15 @@
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
 import scala.io.Source
-
-import org.scalatest.{FlatSpec, Matchers}
 import scala.util.{Failure, Success}
 
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
 import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
 import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
 
-class ParamJsonReaderSpec extends FlatSpec with Matchers {
+class ParamJsonReaderSpec extends AnyFlatSpec with Matchers {
 
   "params " should "be parsed from a valid file" in {
     val bufferedSource =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
index 898c8fd..6f5717a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
@@ -19,11 +19,12 @@ package org.apache.griffin.measure.context
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.types._
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 
-class DataFrameCacheTest extends FlatSpec with Matchers with SparkSuiteBase {
+class DataFrameCacheTest extends AnyFlatSpec with Matchers with SparkSuiteBase {
 
   def createDataFrame(arr: Seq[Int]): DataFrame = {
     val schema = StructType(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
index 7d25cb1..9a63abd 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.context
 
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
-class MetricWrapperTest extends FlatSpec with Matchers {
+class MetricWrapperTest extends AnyFlatSpec with Matchers {
 
   "metric wrapper" should "flush empty if no metric inserted" in {
     val metricWrapper = MetricWrapper("name", "appId")
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
index 28c195b..9648d96 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.context
 
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
-class TimeRangeTest extends FlatSpec with Matchers {
+class TimeRangeTest extends AnyFlatSpec with Matchers {
 
   "time range" should "be able to merge another time range" in {
     val tr1 = TimeRange(1, 10, Set(2, 5, 8))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
index a90768e..c60b439 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
@@ -17,9 +17,10 @@
 
 package org.apache.griffin.measure.datasource
 
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
-class TimestampStorageTest extends FlatSpec with Matchers {
+class TimestampStorageTest extends AnyFlatSpec with Matchers {
 
   "timestamp storage" should "be able to insert a timestamp" in {
     val timestampStorage = TimestampStorage()
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 466196b..fd4e6a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.dstream.InputDStream
-import org.scalatest.FlatSpec
+import org.scalatest.flatspec.AnyFlatSpec
 
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.context.TimeRange
@@ -75,7 +75,7 @@ class DataConnectorWithoutApply extends BatchDataConnector {
   override def data(ms: Long): (Option[DataFrame], TimeRange) = null
 }
 
-class DataConnectorFactorySpec extends FlatSpec {
+class DataConnectorFactorySpec extends AnyFlatSpec {
 
   "DataConnectorFactory" should "be able to create custom batch connector" in {
     val param = DataConnectorParam(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
index 60c50ca..37c63fc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
@@ -18,13 +18,15 @@
 package org.apache.griffin.measure.datasource.connector.batch
 
 import org.apache.spark.sql.types.StructType
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
+import org.scalatest.Ignore
 import org.testcontainers.elasticsearch.ElasticsearchContainer
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
 import org.apache.griffin.measure.datasource.TimestampStorage
 
+@Ignore
 class ElasticSearchDataConnectorTest extends SparkSuiteBase with Matchers {
 
   // ignorance flag that could skip cases
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
index 2493df3..2ae65be 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -18,7 +18,7 @@
 package org.apache.griffin.measure.datasource.connector.batch
 
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
-import org.scalatest._
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -65,13 +65,12 @@ class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
     // valid schema
     val result1 = FileBasedDataConnector(
       spark,
-      dcParam.copy(
-        config = configs + (
-          (
-            "schema",
-            Seq(
-              Map("name" -> "name", "type" -> "string"),
-              Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
+      dcParam.copy(config = configs + (
+        (
+          "schema",
+          Seq(
+            Map("name" -> "name", "type" -> "string"),
+            Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
       timestampStorage)
       .data(1L)
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index 77040e3..8a755b6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -19,7 +19,7 @@ package org.apache.griffin.measure.datasource.connector.batch
 import java.sql.DriverManager
 import java.util.Properties
 
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a557dda..9fc9883 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -19,7 +19,7 @@ package org.apache.griffin.measure.job
 
 import scala.util.{Failure, Success}
 
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.Application._
@@ -30,12 +30,7 @@ import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
 
-class DQAppTest
-    extends FlatSpec
-    with SparkSuiteBase
-    with BeforeAndAfterAll
-    with Matchers
-    with Loggable {
+class DQAppTest extends SparkSuiteBase with Matchers with Loggable {
 
   var envParam: EnvConfig = _
   var sparkParam: SparkParam = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index e4754e0..8bf81b1 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -57,7 +57,7 @@ class CustomSinkTest extends SinkTestBase {
       }
       sinks.headOption match {
         case Some(sink: CustomSink) => sink.allMetrics
-        case _ => mutable.ListBuffer[String]()
+        case _ => Map.empty
       }
     })
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index 919183b..b13858f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -19,14 +19,15 @@ package org.apache.griffin.measure.sink
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.types._
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 
-trait SinkTestBase extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+trait SinkTestBase extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
 
   var sinkParams: Seq[SinkParam]
 
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 834d8e0..5892c11 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -17,15 +17,17 @@
 
 package org.apache.griffin.measure.step
 
-import org.scalatest._
 import scala.util.Try
 
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
 import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
 import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 import org.apache.griffin.measure.step.transform.TransformStep
 
-class TransformStepTest extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+class TransformStepTest extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
 
   case class DualTransformStep(
       name: String,
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
index 67e5236..f80fb13 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
@@ -17,15 +17,15 @@
 
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.scalatest._
-import org.scalatest.mockito.MockitoSugar
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+import org.scalatestplus.mockito.MockitoSugar
 
-import org.apache.griffin.measure.configuration.dqdefinition.RuleErrorConfParam
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam, RuleParam}
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.builder.dsl.expr.Expr
 
-class CompletenessExpr2DQStepsTest extends FlatSpec with Matchers with MockitoSugar {
+class CompletenessExpr2DQStepsTest extends AnyFlatSpec with Matchers with MockitoSugar {
 
   "CompletenessExpr2DQSteps" should "get correct where clause" in {
     val completeness = CompletenessExpr2DQSteps(mock[DQContext], mock[Expr], mock[RuleParam])
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 2f06ad3..5b019a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -18,7 +18,8 @@
 package org.apache.griffin.measure.transformations
 
 import org.apache.spark.sql.DataFrame
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition._
@@ -29,7 +30,10 @@ import org.apache.griffin.measure.job.builder.DQJobBuilder
 
 case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double)
 
-class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with SparkSuiteBase {
+class AccuracyTransformationsIntegrationTest
+    extends AnyFlatSpec
+    with Matchers
+    with SparkSuiteBase {
   private val EMPTY_PERSON_TABLE = "empty_person"
   private val PERSON_TABLE = "person"
 
@@ -80,10 +84,9 @@ class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with
       sourceName: String,
       targetName: String,
       expectedResult: AccuracyResult) = {
-    val dqContext: DQContext = getDqContext(
-      dataSourcesParam = List(
-        DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
-        DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
+    val dqContext: DQContext = getDqContext(dataSourcesParam = List(
+      DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
+      DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
 
     val accuracyRule = RuleParam(
       dslType = "griffin-dsl",
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
index 720f9b2..9f204a8 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
@@ -17,11 +17,13 @@
 
 package org.apache.griffin.measure.utils
 
-import org.scalatest._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
 
 import org.apache.griffin.measure.utils.ParamUtil._
 
-class ParamUtilTest extends FlatSpec with Matchers with BeforeAndAfter {
+class ParamUtilTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
 
   val fruits: Map[String, Any] =
     Map[String, Any]("A" -> "apple", "B" -> "banana", "O" -> "orange")
diff --git a/pom.xml b/pom.xml
index 964cf11..e51c94e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,13 +40,23 @@ under the License.
     </prerequisites>
 
     <properties>
+        <encoding>UTF-8</encoding>
+        <project.build.sourceEncoding>${encoding}</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>${encoding}</project.reporting.outputEncoding>
+
         <java.version>1.8</java.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala211.binary.version>2.11</scala211.binary.version>
+        <scala.version>${scala.binary.version}.0</scala.version>
+
+        <maven.compiler.source>${java.version}</maven.compiler.source>
+        <maven.compiler.target>${java.version}</maven.compiler.target>
+
         <maven-apache-rat.version>0.11</maven-apache-rat.version>
         <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <maven.compiler.source>1.8</maven.compiler.source>
-        <maven.compiler.target>1.8</maven.compiler.target>
+
+        <compile.scope>compile</compile.scope>
+        <provided.scope>provided</provided.scope>
     </properties>
 
     <licenses>
@@ -109,7 +119,7 @@ under the License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-compiler-plugin</artifactId>
-                    <version>3.6.1</version>
+                    <version>3.8.1</version>
                     <configuration>
                         <source>${maven.compiler.source}</source>
                         <target>${maven.compiler.target}</target>
@@ -119,28 +129,16 @@ under the License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.22.0</version>
+                    <version>3.0.0-M3</version>
                     <configuration>
+                        <testFailureIgnore>false</testFailureIgnore>
+                        <useSystemClassLoader>true</useSystemClassLoader>
+                        <forkMode>once</forkMode>
+                        <failIfNoTests>false</failIfNoTests>
                         <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                        <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
-                    </configuration>
-                    <executions>
-                        <execution>
-                            <id>test</id>
-                            <goals>
-                                <goal>test</goal>
-                            </goals>
-                        </execution>
-                    </executions>
-                </plugin>
-                <!-- Scalatest runs all Scala tests -->
-                <!-- enable scalatest -->
-                <plugin>
-                    <groupId>org.scalatest</groupId>
-                    <artifactId>scalatest-maven-plugin</artifactId>
-                    <version>1.0</version>
-                    <configuration>
-                        <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
+                        <systemProperties>
+                            <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+                        </systemProperties>
                     </configuration>
                     <executions>
                         <execution>
@@ -151,6 +149,7 @@ under the License.
                         </execution>
                     </executions>
                 </plugin>
+
                 <plugin>
                     <groupId>org.apache.rat</groupId>
                     <artifactId>apache-rat-plugin</artifactId>
diff --git a/service/pom.xml b/service/pom.xml
index a1823a3..8121998 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -34,7 +34,6 @@ under the License.
     <properties>
         <hadoop.version>2.7.1</hadoop.version>
         <hive.version>2.2.0</hive.version>
-        <scala.version>2.10</scala.version>
         <spring.boot.version>2.1.7.RELEASE</spring.boot.version>
         <spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>
         <confluent.version>3.2.0</confluent.version>
@@ -258,7 +257,7 @@ under the License.
         <!--livy-core-->
         <dependency>
             <groupId>com.cloudera.livy</groupId>
-            <artifactId>livy-core_${scala.version}</artifactId>
+            <artifactId>livy-core_2.10</artifactId>
             <version>${livy.core.version}</version>
         </dependency>