You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by ch...@apache.org on 2021/03/09 09:03:23 UTC
[griffin] branch master updated: [GRIFFIN-345] Support
cross-version compilation for Scala and Spark dependencies
This is an automated email from the ASF dual-hosted git repository.
chitralverma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 076c8d0 [GRIFFIN-345] Support cross-version compilation for Scala and Spark dependencies
076c8d0 is described below
commit 076c8d08abf9f825dde22213e35e1b24f50cb1f2
Author: chitralverma <ch...@gmail.com>
AuthorDate: Tue Mar 9 14:33:02 2021 +0530
[GRIFFIN-345] Support cross-version compilation for Scala and Spark dependencies
**What changes were proposed in this pull request?**
_This PR affects only the measure module._
In newer environments specially clouds, Griffin measure module may face compatibility issues due the old Scala and Spark versions. To remedy this following topics are covered in this ticket,
- Cross-compilation across scala major versions (2.11, 2.12)
- Update Spark Version (2.4+)
- Create maven profiles to build different scala and spark versions
- Changes to build strategy
This process is also done is apache spark to build for different versions of Scala and Hadoop.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Via maven build process.
Closes #589 from chitralverma/cross-version-build.
Authored-by: chitralverma <ch...@gmail.com>
Signed-off-by: chitralverma <ch...@gmail.com>
---
.gitignore | 3 +-
griffin-doc/deploy/measure-build-guide.md | 74 +++
measure/assembly.xml | 64 ---
measure/pom.xml | 527 +++++++++++++++------
.../datasource/cache/StreamingCacheClient.scala | 17 +-
.../connector/DataConnectorFactory.scala | 11 +-
.../connector/batch/CassandraDataConnector.scala | 53 ++-
.../batch/ElasticSearchGriffinDataConnector.scala | 105 ++--
.../connector/batch/FileBasedDataConnector.scala | 14 +-
.../connector/batch/JDBCBasedDataConnector.scala | 41 +-
.../connector/batch/MySqlDataConnector.scala | 31 +-
.../streaming/KafkaStreamingDataConnector.scala | 35 +-
.../dsl/transform/analyzer/BasicAnalyzer.scala | 2 +-
.../measure/step/write/MetricFlushStep.scala | 25 +-
.../org/apache/griffin/measure/utils/FSUtil.scala | 26 +-
.../apache/griffin/measure/utils/ThreadUtils.scala | 14 +-
.../apache/griffin/measure/SparkSuiteBase.scala | 5 +-
.../dqdefinition/reader/ParamEnumReaderSpec.scala | 6 +-
.../dqdefinition/reader/ParamFileReaderSpec.scala | 6 +-
.../dqdefinition/reader/ParamJsonReaderSpec.scala | 7 +-
.../measure/context/DataFrameCacheTest.scala | 5 +-
.../measure/context/MetricWrapperTest.scala | 5 +-
.../griffin/measure/context/TimeRangeTest.scala | 5 +-
.../measure/datasource/TimestampStorageTest.scala | 5 +-
.../connector/DataConnectorFactorySpec.scala | 4 +-
.../batch/ElasticSearchDataConnectorTest.scala | 4 +-
.../batch/FileBasedDataConnectorTest.scala | 15 +-
.../batch/JDBCBasedDataConnectorTest.scala | 2 +-
.../org/apache/griffin/measure/job/DQAppTest.scala | 9 +-
.../griffin/measure/sink/CustomSinkTest.scala | 2 +-
.../apache/griffin/measure/sink/SinkTestBase.scala | 5 +-
.../griffin/measure/step/TransformStepTest.scala | 6 +-
.../transform/CompletenessExpr2DQStepsTest.scala | 10 +-
.../AccuracyTransformationsIntegrationTest.scala | 15 +-
.../griffin/measure/utils/ParamUtilTest.scala | 6 +-
pom.xml | 49 +-
service/pom.xml | 3 +-
37 files changed, 752 insertions(+), 464 deletions(-)
diff --git a/.gitignore b/.gitignore
index cd6b316..ef3e99e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ target/**
*.jar
*.war
*.ear
+*.cfg
target
service/src/main/resources/public/
ui/.tmp/
@@ -21,7 +22,7 @@ ui/.tmp/
.settings/
.classpath
bin
-
+**/site-packages/**
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# don't need binary
diff --git a/griffin-doc/deploy/measure-build-guide.md b/griffin-doc/deploy/measure-build-guide.md
new file mode 100644
index 0000000..8914bc6
--- /dev/null
+++ b/griffin-doc/deploy/measure-build-guide.md
@@ -0,0 +1,74 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Griffin Build Guide - Measure Module
+
+Like other modules of Apache Griffin, `measure` module is also built using Maven build tool. Building `measure` module
+requires Maven version 3.5+ and Java 8.
+
+## Version Compatibility
+
+Starting from Apache Griffin 0.7, the `measure` module will be (scala-spark) cross version compatible. Since both Scala
+and Spark are dependencies for Apache Griffin, details of Spark-Scala cross version compatibility is mentioned below,
+
+| | Spark 2.3.x | Spark 2.4.x | Spark 3.0.x |
+| --------- |:-----------:|:-----------:|:-----------:|
+| Scala 2.11| ✓ | ✓ | x |
+| Scala 2.12| x | ✓ | ✓ |
+
+## Building a Distribution
+
+Execute the below commands to build `measure` with desired version of Spark and Scala,
+
+By default, the build is compiled with Scala 2.11 and Apache Spark 2.4.x.
+
+```
+# For measure module with Scala 2.11 and Spark 2.4.x
+mvn clean package
+```
+
+To change Scala or Spark version you can use the commands below,
+
+```
+# For measure module with Scala 2.12 and Spark 2.4.x
+mvn clean package -Dscala-2.12
+```
+
+```
+# For measure module with Scala 2.11 and Spark 2.3.x
+mvn clean package -Dspark-2.3
+```
+
+```
+# For measure module with Scala 2.12 and Spark 3.0.x
+mvn clean package -Dscala-2.12 -Dspark-3.0
+```
+
+Note:
+ - Using `-Dscala-2.12` and `-Dspark-2.3` option together will cause build failure due to missing dependencies as it
+is not cross compiled, see details [here](#version-compatibility)
+ - Using `-Dspark-3.0` option without `-Dscala-2.12` option will cause build failure due to missing dependencies as it
+ is not cross compiled, see details [here](#version-compatibility)
+
+### AVRO Source Support
+
+Starting Spark 2.4.x, AVRO source (`spark-avro` package) was migrated from `com.databricks` group to `org.apache.spark`.
+Additionally, the older dependency does not support scala 2.12.
+
+All builds of `measure` module will contain AVRO source support by default.
diff --git a/measure/assembly.xml b/measure/assembly.xml
deleted file mode 100644
index 7890497..0000000
--- a/measure/assembly.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
- <id>package</id>
- <formats>
- <format>tar.gz</format>
- </formats>
- <fileSets>
- <fileSet>
- <directory>${project.basedir}/sbin</directory>
- <outputDirectory>/bin</outputDirectory>
- <includes>
- <include>/**</include>
- </includes>
- <lineEnding>unix</lineEnding>
- <fileMode>0777</fileMode>
- <directoryMode>0755</directoryMode>
- </fileSet>
- <fileSet>
- <directory>${project.basedir}/src/main/resources</directory>
- <outputDirectory>/conf</outputDirectory>
- <includes>
- <include>/**</include>
- </includes>
- </fileSet>
- </fileSets>
-
- <dependencySets>
- <dependencySet>
- <useProjectArtifact>true</useProjectArtifact>
- <outputDirectory>/lib</outputDirectory>
- <unpack>false</unpack>
- <scope>provided</scope>
- <excludes>
- <exclude>com.twitter:parquet-hadoop-bundle</exclude>
- <exclude>io.dropwizard.metrics:*</exclude>
- <exclude>org.glassfish.jersey.core:*</exclude>
- <exclude>org.glassfish.jersey.containers:*</exclude>
- <exclude>org.apache.thrift:*</exclude>
- <exclude>org.apache.parquet:*</exclude>
- <exclude>org.apache.hadoop:*</exclude>
- <exclude>org.apache.spark:*</exclude>
- </excludes>
- </dependencySet>
- </dependencySets>
-</assembly>
diff --git a/measure/pom.xml b/measure/pom.xml
index 5408d37..3cd8bc6 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -17,7 +17,8 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -29,290 +30,308 @@ under the License.
<artifactId>measure</artifactId>
<packaging>jar</packaging>
- <name>Apache Griffin :: Measures</name>
- <url>http://maven.apache.org</url>
+ <name>Apache Griffin :: Measure</name>
+ <url>http://griffin.apache.org</url>
<properties>
- <scala.version>2.11.8</scala.version>
- <spark.version>2.2.1</spark.version>
- <scala.binary.version>2.11</scala.binary.version>
- <avro.version>1.7.7</avro.version>
- <jackson.version>2.8.7</jackson.version>
+
+ <!-- Spark -->
+ <spark.major.version>2.4</spark.major.version>
+ <spark.version>${spark.major.version}.4</spark.version>
+ <spark.scope>${provided.scope}</spark.scope>
+ <spark-streaming-kafka.version>${spark.version}</spark-streaming-kafka.version>
+
+ <!-- Code Standardization -->
+ <scalastyle.version>1.0.0</scalastyle.version>
+ <scalafmt.version>1.0.3</scalafmt.version>
+
+ <!-- Data Connectors -->
+ <mysql.java.version>5.1.47</mysql.java.version>
+ <mariadb.version>2.7.0</mariadb.version>
+ <cassandra.connector.version>2.5.1</cassandra.connector.version>
+ <elasticsearch.version>6.4.1</elasticsearch.version>
<scalaj.version>2.3.0</scalaj.version>
<mongo.version>2.1.0</mongo.version>
- <scalatest.version>3.0.0</scalatest.version>
- <slf4j.version>1.7.21</slf4j.version>
- <log4j.version>1.2.16</log4j.version>
<curator.version>2.10.0</curator.version>
- <mockito.version>1.10.19</mockito.version>
- <mysql.java.version>5.1.47</mysql.java.version>
- <cassandra.connector.version>2.4.1</cassandra.connector.version>
- <scalastyle.version>1.0.0</scalastyle.version>
- <scalafmt.parameters>--diff --test</scalafmt.parameters>
- <scalafmt.skip>false</scalafmt.skip>
- <elasticsearch.version>6.4.1</elasticsearch.version>
- <spark.scope>provided</spark.scope>
+
+ <!-- Testing -->
+ <scoverage.plugin.version>1.4.0</scoverage.plugin.version>
+ <mockito.version>3.2.2.0</mockito.version>
+ <scalatest.version>3.2.3</scalatest.version>
</properties>
<dependencies>
- <!--scala-->
+
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
- <!--spark, spark streaming, spark hive-->
+ <!-- Spark Dependencies -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
+
+ <!-- Data Connectors -->
+
+ <!-- Spark Hive-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
+ </dependency>
+
+ <!-- Spark Kafka-->
+ <!-- TODO
+ Spark Streaming Kafka 08 is not cross compiled with scala 2.12.
+ Thus, this dependency has been temporary un-banned in the enforcer plugin.
+ This will be removed with structured streaming implementation
+ -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka-0-8_${scala211.binary.version}</artifactId>
+ <version>${spark-streaming-kafka.version}</version>
<exclusions>
<exclusion>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-xml_2.11</artifactId>
</exclusion>
<exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
+ <groupId>org.scala-lang.modules</groupId>
+ <artifactId>scala-parser-combinators_2.11</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <!--jackson-->
+ <!-- MongoDB-->
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
- <version>${jackson.version}</version>
+ <groupId>org.mongodb.scala</groupId>
+ <artifactId>mongo-scala-driver_${scala.binary.version}</artifactId>
+ <version>${mongo.version}</version>
</dependency>
- <!--scalaj for http request-->
+ <!-- MySql -->
<dependency>
- <groupId>org.scalaj</groupId>
- <artifactId>scalaj-http_${scala.binary.version}</artifactId>
- <version>${scalaj.version}</version>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql.java.version}</version>
</dependency>
- <!--mongo request-->
+ <!-- MariaDB -->
<dependency>
- <groupId>org.mongodb.scala</groupId>
- <artifactId>mongo-scala-driver_2.11</artifactId>
- <version>${mongo.version}</version>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>${mariadb.version}</version>
</dependency>
- <!--avro-->
+ <!-- Spark Cassandra-->
<dependency>
- <groupId>com.databricks</groupId>
- <artifactId>spark-avro_${scala.binary.version}</artifactId>
- <version>4.0.0</version>
+ <groupId>com.datastax.spark</groupId>
+ <artifactId>spark-cassandra-connector_${scala.binary.version}</artifactId>
+ <version>${cassandra.connector.version}</version>
</dependency>
- <!--log4j-->
+ <!-- Spark Elasticsearch-->
+ <!-- TODO
+ Elasticsearch Spark has recently added support for scala 2.12 but it is not published in the Maven Central yet.
+ Thus, this dependency has been temporary un-banned in the enforcer plugin.
+ See https://github.com/elastic/elasticsearch-hadoop/pull/1589#issuecomment-776920189
+ -->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch-spark-20_${scala211.binary.version}</artifactId>
+ <version>${elasticsearch.version}</version>
</dependency>
+ <!-- Miscellaneous-->
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
</dependency>
+
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
+ <groupId>org.scalaj</groupId>
+ <artifactId>scalaj-http_${scala.binary.version}</artifactId>
+ <version>${scalaj.version}</version>
</dependency>
- <!--curator-->
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.9</version>
</dependency>
- <!--junit-->
+ <!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
- <!--scala test-->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>${mockito.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>mockito-3-4_${scala.binary.version}</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.java.version}</version>
- </dependency>
- <dependency>
- <groupId>com.datastax.spark</groupId>
- <artifactId>spark-cassandra-connector_2.11</artifactId>
- <version>${cassandra.connector.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.5.9</version>
- </dependency>
- <dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-spark-20_2.11</artifactId>
- <version>${elasticsearch.version}</version>
- </dependency>
+
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
+
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
+
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
- <version>3.3.1</version>
+ <version>4.3.0</version>
<executions>
<execution>
- <id>compile</id>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
- <phase>compile</phase>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
+ <checkMultipleScalaVersions>true</checkMultipleScalaVersions>
+ <failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
+ <recompileMode>incremental</recompileMode>
<args>
+ <arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
- <arg>-unchecked</arg>
+ <arg>-explaintypes</arg>
</args>
+ <jvmArgs>
+ <jvmArg>-Xms64m</jvmArg>
+ <jvmArg>-Xmx1024m</jvmArg>
+ </jvmArgs>
+ <javacArgs>
+ <javacArg>-source</javacArg>
+ <javacArg>-target</javacArg>
+ <javacArg>${java.version}</javacArg>
+ <javacArg>-Xlint:all,-serial,-path,-try</javacArg>
+ </javacArgs>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- </plugin>
+
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <testFailureIgnore>false</testFailureIgnore>
+ <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>TestSuiteReport.txt</filereports>
+ <stderr/>
+ <systemProperties>
+ <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.1.0</version>
- <configuration>
- <createDependencyReducedPom>false</createDependencyReducedPom>
- </configuration>
+ <version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.griffin.measure.Application</mainClass>
- </transformer>
- </transformers>
- <relocations>
- <relocation>
- <pattern>org.apache.http</pattern>
- <shadedPattern>griffin.org.apache.http</shadedPattern>
- </relocation>
- </relocations>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/maven/**</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
</execution>
</executions>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <minimizeJar>true</minimizeJar>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>griffin.org.apache.http</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
</plugin>
+
<plugin>
<groupId>org.antipathy</groupId>
<artifactId>mvn-scalafmt_${scala.binary.version}</artifactId>
- <version>0.12_1.5.1</version>
+ <version>${scalafmt.version}</version>
<configuration>
- <parameters>${scalafmt.parameters}</parameters>
- <skip>${scalafmt.skip}</skip>
- <skipSources>${scalafmt.skip}</skipSources>
- <skipTestSources>${scalafmt.skip}</skipTestSources>
<configLocation>${project.parent.basedir}/.scalafmt.conf</configLocation>
+ <skipTestSources>false</skipTestSources>
+ <skipSources>false</skipSources>
+ <respectVersion>false</respectVersion>
+ <validateOnly>false</validateOnly>
</configuration>
<executions>
<execution>
- <phase>validate</phase>
+ <phase>compile</phase>
<goals>
<goal>format</goal>
</goals>
</execution>
</executions>
</plugin>
+
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
@@ -331,21 +350,81 @@ under the License.
</configuration>
<executions>
<execution>
- <phase>validate</phase>
+ <phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.8.2</version>
+ <executions>
+ <execution>
+ <id>default-prepare-agent</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>report</id>
+ <phase>test</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scoverage</groupId>
+ <artifactId>scoverage-maven-plugin</artifactId>
+ <version>${scoverage.plugin.version}</version>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <aggregate>true</aggregate>
+ <highlighting>true</highlighting>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <classifier>spark-${spark.major.version}_scala-${scala.binary.version}</classifier>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>3.3.0</version>
<configuration>
- <descriptors>
- <descriptor>assembly.xml</descriptor>
- </descriptors>
- <finalName>${artifactId}-${project.version}</finalName>
+ <tarLongFileMode>posix</tarLongFileMode>
+ <finalName>
+ ${project.artifactId}-${project.version}-spark-${spark.major.version}_scala-${scala.binary.version}
+ </finalName>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.griffin.measure.Application</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
</configuration>
<executions>
<execution>
@@ -357,6 +436,172 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>3.0.0-M2</version>
+ <executions>
+ <execution>
+ <id>enforce-versions</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireJavaVersion>
+ <version>${java.version}</version>
+ </requireJavaVersion>
+ <bannedDependencies>
+ <excludes>
+ <exclude>*:*_2.10</exclude>
+ <exclude>*:*_2.12</exclude>
+ <exclude>*:*_2.13</exclude>
+ </excludes>
+ <searchTransitive>true</searchTransitive>
+ </bannedDependencies>
+ </rules>
+ </configuration>
+ </execution>
+ <execution>
+ <id>enforce-no-duplicate-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <banDuplicatePomDependencyVersions/>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
+
+ <profiles>
+ <!-- Scala 2.12 -->
+ <profile>
+ <id>scala-2.12</id>
+ <activation>
+ <property>
+ <name>scala-2.12</name>
+ </property>
+ </activation>
+
+ <properties>
+ <scala.binary.version>2.12</scala.binary.version>
+ <scala.version>${scala.binary.version}.12</scala.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce-versions</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireJavaVersion>
+ <version>${java.version}</version>
+ </requireJavaVersion>
+ <bannedDependencies>
+ <!-- TODO Temporary Fix: These will be removed in a future iteration. -->
+ <includes>
+ <include>org.elasticsearch:elasticsearch-spark-20_2.11:*</include>
+ <include>org.apache.spark:spark-streaming-kafka-0-8_2.11:*</include>
+ <include>org.apache.kafka:kafka_2.11:*</include>
+ </includes>
+ <excludes>
+ <exclude>*:*_2.10</exclude>
+ <exclude>*:*_2.11</exclude>
+ <exclude>*:*_2.13</exclude>
+ </excludes>
+ <searchTransitive>true</searchTransitive>
+ </bannedDependencies>
+ </rules>
+ </configuration>
+ </execution>
+ <execution>
+ <id>enforce-no-duplicate-dependencies</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <banDuplicatePomDependencyVersions/>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!-- Spark 2.3.x -->
+ <profile>
+ <id>spark-2.3</id>
+ <activation>
+ <property>
+ <name>spark-2.3</name>
+ </property>
+ </activation>
+
+ <properties>
+ <spark.major.version>2.3</spark.major.version>
+ <spark.version>${spark.major.version}.4</spark.version>
+ </properties>
+
+ <dependencies>
+ <!-- Old Spark Avro-->
+ <dependency>
+ <groupId>com.databricks</groupId>
+ <artifactId>spark-avro_${scala211.binary.version}</artifactId>
+ <version>4.0.0</version>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ <!-- Spark 3.0.x -->
+ <profile>
+ <id>spark-3.0</id>
+ <activation>
+ <property>
+ <name>spark-3.0</name>
+ </property>
+ </activation>
+
+ <properties>
+ <spark.major.version>3.0</spark.major.version>
+ <spark.version>${spark.major.version}.2</spark.version>
+ <spark-streaming-kafka.version>2.4.7</spark-streaming-kafka.version>
+ </properties>
+ </profile>
+
+ <!-- Spark 2.4+ Avro -->
+ <profile>
+ <id>spark-avro-2.4</id>
+ <activation>
+ <property>
+ <name>!spark-2.3</name>
+ </property>
+ </activation>
+
+ <dependencies>
+ <!-- New Spark Avro-->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-avro_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+
+ </profiles>
</project>
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index 04775c7..643ff5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -184,14 +184,15 @@ trait StreamingCacheClient
}
// new cache data
- val newDfOpt = try {
- val dfr = sparkSession.read
- readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
- } catch {
- case e: Throwable =>
- warn(s"read data source cache warn: ${e.getMessage}")
- None
- }
+ val newDfOpt =
+ try {
+ val dfr = sparkSession.read
+ readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
+ } catch {
+ case e: Throwable =>
+ warn(s"read data source cache warn: ${e.getMessage}")
+ None
+ }
// old cache data
val oldCacheIndexOpt = if (updatable) readOldCacheIndex() else None
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index 39827cf..0b08dd4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -32,8 +32,15 @@ import org.apache.griffin.measure.datasource.connector.streaming._
object DataConnectorFactory extends Loggable {
- @deprecated val AvroRegex: Regex = """^(?i)avro$""".r
- @deprecated val TextDirRegex: Regex = """^(?i)text-dir$""".r
+ @deprecated(
+ s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+ "0.6.0")
+ val AvroRegex: Regex = """^(?i)avro$""".r
+
+ @deprecated(
+ s"This class is deprecated. Use '${classOf[FileBasedDataConnector].getCanonicalName}' with correct format.",
+ "0.6.0")
+ val TextDirRegex: Regex = """^(?i)text-dir$""".r
val HiveRegex: Regex = """^(?i)hive$""".r
val FileRegex: Regex = """^(?i)file$""".r
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
index d135f3b..2fd2912 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala
@@ -52,33 +52,34 @@ case class CassandraDataConnector(
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- sparkSession.conf.set("spark.cassandra.connection.host", host)
- sparkSession.conf.set("spark.cassandra.connection.port", port)
- sparkSession.conf.set("spark.cassandra.auth.username", user)
- sparkSession.conf.set("spark.cassandra.auth.password", password)
-
- val tableDef: DataFrameReader = sparkSession.read
- .format("org.apache.spark.sql.cassandra")
- .options(Map("table" -> tableName, "keyspace" -> database))
-
- val dataWh: String = dataWhere()
-
- var data: DataFrame = null
- if (wheres.length > 0) {
- data = tableDef.load().where(dataWh)
- } else {
- data = tableDef.load()
+ val dfOpt =
+ try {
+ sparkSession.conf.set("spark.cassandra.connection.host", host)
+ sparkSession.conf.set("spark.cassandra.connection.port", port)
+ sparkSession.conf.set("spark.cassandra.auth.username", user)
+ sparkSession.conf.set("spark.cassandra.auth.password", password)
+
+ val tableDef: DataFrameReader = sparkSession.read
+ .format("org.apache.spark.sql.cassandra")
+ .options(Map("table" -> tableName, "keyspace" -> database))
+
+ val dataWh: String = dataWhere()
+
+ var data: DataFrame = null
+ if (wheres.length > 0) {
+ data = tableDef.load().where(dataWh)
+ } else {
+ data = tableDef.load()
+ }
+
+ val dfOpt = Some(data)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
+ None
}
-
- val dfOpt = Some(data)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
- None
- }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index dfdacc8..597695e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -77,23 +77,24 @@ case class ElasticSearchGriffinDataConnector(
def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
val path: String = s"/_sql?format=csv"
info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
- val dfOpt = try {
- val answer = httpPost(path, sql)
- if (answer._1) {
- import sparkSession.implicits._
- val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
- val reader: DataFrameReader = sparkSession.read
- reader.option("header", true).option("inferSchema", true)
- val df: DataFrame = reader.csv(rdd.toDS())
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } else None
- } catch {
- case e: Throwable =>
- error(s"load ES by sql $host:$port $sql fails: ${e.getMessage}", e)
- None
- }
+ val dfOpt =
+ try {
+ val answer = httpPost(path, sql)
+ if (answer._1) {
+ import sparkSession.implicits._
+ val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+ val reader: DataFrameReader = sparkSession.read
+ reader.option("header", true).option("inferSchema", true)
+ val df: DataFrame = reader.csv(rdd.toDS())
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } else None
+ } catch {
+ case e: Throwable =>
+ error(s"load ES by sql $host:$port $sql fails: ${e.getMessage}", e)
+ None
+ }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
@@ -102,44 +103,44 @@ case class ElasticSearchGriffinDataConnector(
val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
- val dfOpt = try {
- val answer = httpGet(path)
- val data = ArrayBuffer[Map[String, Number]]()
-
- if (answer._1) {
- val arrayAnswers: util.Iterator[JsonNode] =
- parseString(answer._2).get("hits").get("hits").elements()
-
- while (arrayAnswers.hasNext) {
- val answer = arrayAnswers.next()
- val values = answer.get("_source").get("value")
- val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
- val fieldsMap = mutable.Map[String, Number]()
- while (fields.hasNext) {
- val fld: util.Map.Entry[String, JsonNode] = fields.next()
- fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+ val dfOpt =
+ try {
+ val answer = httpGet(path)
+ val data = ArrayBuffer[Map[String, Number]]()
+
+ if (answer._1) {
+ val arrayAnswers: util.Iterator[JsonNode] =
+ parseString(answer._2).get("hits").get("hits").elements()
+
+ while (arrayAnswers.hasNext) {
+ val answer = arrayAnswers.next()
+ val values = answer.get("_source").get("value")
+ val fields: util.Iterator[util.Map.Entry[String, JsonNode]] = values.fields()
+ val fieldsMap = mutable.Map[String, Number]()
+ while (fields.hasNext) {
+ val fld: util.Map.Entry[String, JsonNode] = fields.next()
+ fieldsMap.put(fld.getKey, fld.getValue.numberValue())
+ }
+ data += fieldsMap.toMap
}
- data += fieldsMap.toMap
}
+ val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
+ val columns: Array[String] = fields.toArray
+ val defaultNumber: Number = 0.0
+ val rdd: RDD[Row] = rdd1
+ .map { x: Map[String, Number] =>
+ Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
+ }
+ val schema = dfSchema(columns.toList)
+ val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
+ None
}
- val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
- val columns: Array[String] = fields.toArray
- val defaultNumber: Number = 0.0
- val rdd: RDD[Row] = rdd1
- .map { x: Map[String, Number] =>
- Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
- }
- val schema = dfSchema(columns.toList)
- val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
- df.show(20)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
- None
- }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
index 86049bd..2e4f482 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnector.scala
@@ -79,7 +79,14 @@ case class FileBasedDataConnector(
SupportedFormats.contains(format),
s"Invalid format '$format' specified. Must be one of ${SupportedFormats.mkString("['", "', '", "']")}")
- if (format == "csv") validateCSVOptions()
+ if (format.equalsIgnoreCase("avro") && sparkSession.version < "2.3.0") {
+ format = "com.databricks.spark.avro"
+ }
+
+ if (format == "csv") {
+ validateCSVOptions()
+ }
+
if (format == "tsv") {
format = "csv"
options.getOrElseUpdate(Delimiter, TabDelimiter)
@@ -169,7 +176,8 @@ object FileBasedDataConnector extends Loggable {
private val TabDelimiter: String = "\t"
private val DefaultFormat: String = SQLConf.DEFAULT_DATA_SOURCE_NAME.defaultValueString
- private val SupportedFormats: Seq[String] = Seq("parquet", "orc", "avro", "text", "csv", "tsv")
+ private val SupportedFormats: Seq[String] =
+ Seq("parquet", "orc", "avro", "text", "csv", "tsv", "com.databricks.spark.avro")
/**
* Validates the existence of paths in a given sequence.
@@ -189,7 +197,7 @@ object FileBasedDataConnector extends Loggable {
else throw new IllegalArgumentException(msg)
false
- })
+ })
assert(validPaths.nonEmpty, "No paths were given for the data source.")
validPaths
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
index 9fc21d7..2f77db3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnector.scala
@@ -69,27 +69,28 @@ case class JDBCBasedDataConnector(
assert(isJDBCDriverLoaded(driver), s"JDBC driver $driver not present in classpath")
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dtSql = createSqlStmt()
- val prop = new java.util.Properties
- prop.setProperty("user", user)
- prop.setProperty("password", password)
- prop.setProperty("driver", driver)
- val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
+ val dfOpt =
+ try {
+ val dtSql = createSqlStmt()
+ val prop = new java.util.Properties
+ prop.setProperty("user", user)
+ prop.setProperty("password", password)
+ prop.setProperty("driver", driver)
+ val dfOpt = Try(sparkSession.read.jdbc(url, s"($dtSql) as t", prop))
- dfOpt match {
- case Success(_) =>
- case Failure(exception) =>
- griffinLogger.error("Error occurred while reading data set.", exception)
- }
+ dfOpt match {
+ case Success(_) =>
+ case Failure(exception) =>
+ griffinLogger.error("Error occurred while reading data set.", exception)
+ }
- val preDfOpt = preProcess(dfOpt.toOption, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"loading table $fullTableName fails: ${e.getMessage}", e)
- None
- }
+ val preDfOpt = preProcess(dfOpt.toOption, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"loading table $fullTableName fails: ${e.getMessage}", e)
+ None
+ }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
@@ -99,7 +100,7 @@ case class JDBCBasedDataConnector(
*/
private def createSqlStmt(): String = {
val tableClause = s"SELECT * FROM $fullTableName"
- if (whereString.length > 0) {
+ if (whereString.nonEmpty) {
s"$tableClause WHERE $whereString"
} else tableClause
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
index feacfc9..26321fc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/MySqlDataConnector.scala
@@ -52,21 +52,22 @@ case class MySqlDataConnector(
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dtSql = dataSql()
- val prop = new java.util.Properties
- prop.setProperty("user", user)
- prop.setProperty("password", password)
- prop.setProperty("driver", driver)
- val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable =>
- error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
- None
- }
+ val dfOpt =
+ try {
+ val dtSql = dataSql()
+ val prop = new java.util.Properties
+ prop.setProperty("user", user)
+ prop.setProperty("password", password)
+ prop.setProperty("driver", driver)
+ val df: DataFrame = sparkSession.read.jdbc(url, s"($dtSql) as t", prop)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable =>
+ error(s"load mysql table $fullTableName fails: ${e.getMessage}", e)
+ None
+ }
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 0c501dc..b0be557 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -51,23 +51,24 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
}
ds.foreachRDD((rdd, time) => {
val ms = time.milliseconds
- val saveDfOpt = try {
- // coalesce partition number
- val prlCount = rdd.sparkContext.defaultParallelism
- val ptnCount = rdd.getNumPartitions
- val repartitionedRdd = if (prlCount < ptnCount) {
- rdd.coalesce(prlCount)
- } else rdd
-
- val dfOpt = transform(repartitionedRdd)
-
- // pre-process
- preProcess(dfOpt, ms)
- } catch {
- case e: Throwable =>
- error(s"streaming data connector error: ${e.getMessage}")
- None
- }
+ val saveDfOpt =
+ try {
+ // coalesce partition number
+ val prlCount = rdd.sparkContext.defaultParallelism
+ val ptnCount = rdd.getNumPartitions
+ val repartitionedRdd = if (prlCount < ptnCount) {
+ rdd.coalesce(prlCount)
+ } else rdd
+
+ val dfOpt = transform(repartitionedRdd)
+
+ // pre-process
+ preProcess(dfOpt, ms)
+ } catch {
+ case e: Throwable =>
+ error(s"streaming data connector error: ${e.getMessage}")
+ None
+ }
// save data frame
streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
index 743bba9..9aa27f7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
@@ -43,7 +43,7 @@ trait BasicAnalyzer extends Serializable {
v :+ se
case _ => v
}
- }
+ }
val combSelectionExprs: (Seq[SelectionExpr], Seq[SelectionExpr]) => Seq[SelectionExpr] =
(a: Seq[SelectionExpr], b: Seq[SelectionExpr]) => a ++ b
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 4f4bf99..980bded 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -33,20 +33,21 @@ case class MetricFlushStep() extends WriteStep {
def execute(context: DQContext): Try[Boolean] = Try {
context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
val (t, metric) = pair
- val pr = try {
- context.getSinks(t).foreach { sink =>
- try {
- sink.sinkMetrics(metric)
- } catch {
- case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+ val pr =
+ try {
+ context.getSinks(t).foreach { sink =>
+ try {
+ sink.sinkMetrics(metric)
+ } catch {
+ case e: Throwable => error(s"sink metrics error: ${e.getMessage}", e)
+ }
}
+ true
+ } catch {
+ case e: Throwable =>
+ error(s"flush metrics error: ${e.getMessage}", e)
+ false
}
- true
- } catch {
- case e: Throwable =>
- error(s"flush metrics error: ${e.getMessage}", e)
- false
- }
ret && pr
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
index b7f50e8..680f7a8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -38,13 +38,14 @@ object FSUtil extends Loggable {
fsMap.get(uri.getScheme) match {
case Some(fs) => fs
case _ =>
- val fs = try {
- FileSystem.get(uri, getConfiguration)
- } catch {
- case e: Throwable =>
- error(s"get file system error: ${e.getMessage}", e)
- throw e
- }
+ val fs =
+ try {
+ FileSystem.get(uri, getConfiguration)
+ } catch {
+ case e: Throwable =>
+ error(s"get file system error: ${e.getMessage}", e)
+ throw e
+ }
fsMap += (uri.getScheme -> fs)
fs
}
@@ -60,11 +61,12 @@ object FSUtil extends Loggable {
}
private def getUriOpt(path: String): Option[URI] = {
- val uriOpt = try {
- Some(new URI(path))
- } catch {
- case _: Throwable => None
- }
+ val uriOpt =
+ try {
+ Some(new URI(path))
+ } catch {
+ case _: Throwable => None
+ }
uriOpt.flatMap { uri =>
if (uri.getScheme == null) {
try {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
index 52c5f38..bc1e292 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -21,10 +21,6 @@ import java.util.concurrent._
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
-import scala.concurrent.forkjoin.{
- ForkJoinPool => SForkJoinPool,
- ForkJoinWorkerThread => SForkJoinWorkerThread
-}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
@@ -172,15 +168,15 @@ private[griffin] object ThreadUtils {
/**
* Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
*/
- def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+ def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = {
// Custom factory to set thread names
- val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
- override def newThread(pool: SForkJoinPool): SForkJoinWorkerThread =
- new SForkJoinWorkerThread(pool) {
+ val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
+ override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread =
+ new ForkJoinWorkerThread(pool) {
setName(prefix + "-" + super.getName)
}
}
- new SForkJoinPool(
+ new ForkJoinPool(
maxThreadNumber,
factory,
null, // handler
diff --git a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
index dbed89c..011122f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/SparkSuiteBase.scala
@@ -22,9 +22,10 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
-import org.scalatest.{BeforeAndAfterAll, FlatSpec}
+import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
-trait SparkSuiteBase extends FlatSpec with BeforeAndAfterAll {
+trait SparkSuiteBase extends AnyFlatSpec with BeforeAndAfterAll {
@transient var spark: SparkSession = _
@transient var sc: SparkContext = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
index efaa91f..8f191bc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -17,7 +17,9 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
import org.apache.griffin.measure.configuration.dqdefinition.{
DQConfig,
@@ -26,7 +28,7 @@ import org.apache.griffin.measure.configuration.dqdefinition.{
RuleParam
}
-class ParamEnumReaderSpec extends FlatSpec with Matchers {
+class ParamEnumReaderSpec extends AnyFlatSpec with Matchers {
import org.apache.griffin.measure.configuration.enums.DslType._
"dsltype" should "be parsed to predefined set of values" in {
val validDslSparkSqlValues =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index a05df90..94c7e07 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -22,8 +22,10 @@ import scala.util.{Failure, Success}
import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-
-class ParamFileReaderSpec extends FlatSpec with Matchers {
+import org.scalatest._
+import flatspec.AnyFlatSpec
+import matchers.should._
+class ParamFileReaderSpec extends AnyFlatSpec with Matchers {
"params " should "be parsed from a valid file" in {
val reader: ParamReader =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 86d68b5..d8bf125 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -18,14 +18,15 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
import scala.io.Source
-
-import org.scalatest.{FlatSpec, Matchers}
import scala.util.{Failure, Success}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-class ParamJsonReaderSpec extends FlatSpec with Matchers {
+class ParamJsonReaderSpec extends AnyFlatSpec with Matchers {
"params " should "be parsed from a valid file" in {
val bufferedSource =
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
index 898c8fd..6f5717a 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/DataFrameCacheTest.scala
@@ -19,11 +19,12 @@ package org.apache.griffin.measure.context
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
-class DataFrameCacheTest extends FlatSpec with Matchers with SparkSuiteBase {
+class DataFrameCacheTest extends AnyFlatSpec with Matchers with SparkSuiteBase {
def createDataFrame(arr: Seq[Int]): DataFrame = {
val schema = StructType(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
index 7d25cb1..9a63abd 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/MetricWrapperTest.scala
@@ -17,9 +17,10 @@
package org.apache.griffin.measure.context
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
-class MetricWrapperTest extends FlatSpec with Matchers {
+class MetricWrapperTest extends AnyFlatSpec with Matchers {
"metric wrapper" should "flush empty if no metric inserted" in {
val metricWrapper = MetricWrapper("name", "appId")
diff --git a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
index 28c195b..9648d96 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/context/TimeRangeTest.scala
@@ -17,9 +17,10 @@
package org.apache.griffin.measure.context
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
-class TimeRangeTest extends FlatSpec with Matchers {
+class TimeRangeTest extends AnyFlatSpec with Matchers {
"time range" should "be able to merge another time range" in {
val tr1 = TimeRange(1, 10, Set(2, 5, 8))
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
index a90768e..c60b439 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/TimestampStorageTest.scala
@@ -17,9 +17,10 @@
package org.apache.griffin.measure.datasource
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
-class TimestampStorageTest extends FlatSpec with Matchers {
+class TimestampStorageTest extends AnyFlatSpec with Matchers {
"timestamp storage" should "be able to insert a timestamp" in {
val timestampStorage = TimestampStorage()
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
index 466196b..fd4e6a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactorySpec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
-import org.scalatest.FlatSpec
+import org.scalatest.flatspec.AnyFlatSpec
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
@@ -75,7 +75,7 @@ class DataConnectorWithoutApply extends BatchDataConnector {
override def data(ms: Long): (Option[DataFrame], TimeRange) = null
}
-class DataConnectorFactorySpec extends FlatSpec {
+class DataConnectorFactorySpec extends AnyFlatSpec {
"DataConnectorFactory" should "be able to create custom batch connector" in {
val param = DataConnectorParam(
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
index 60c50ca..37c63fc 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchDataConnectorTest.scala
@@ -18,13 +18,15 @@
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.types.StructType
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
+import org.scalatest.Ignore
import org.testcontainers.elasticsearch.ElasticsearchContainer
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
+@Ignore
class ElasticSearchDataConnectorTest extends SparkSuiteBase with Matchers {
// ignorance flag that could skip cases
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
index 2493df3..2ae65be 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/FileBasedDataConnectorTest.scala
@@ -18,7 +18,7 @@
package org.apache.griffin.measure.datasource.connector.batch
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
-import org.scalatest._
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -65,13 +65,12 @@ class FileBasedDataConnectorTest extends SparkSuiteBase with Matchers {
// valid schema
val result1 = FileBasedDataConnector(
spark,
- dcParam.copy(
- config = configs + (
- (
- "schema",
- Seq(
- Map("name" -> "name", "type" -> "string"),
- Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
+ dcParam.copy(config = configs + (
+ (
+ "schema",
+ Seq(
+ Map("name" -> "name", "type" -> "string"),
+ Map("name" -> "age", "type" -> "int", "nullable" -> "true"))))),
timestampStorage)
.data(1L)
diff --git a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
index 77040e3..8a755b6 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/datasource/connector/batch/JDBCBasedDataConnectorTest.scala
@@ -19,7 +19,7 @@ package org.apache.griffin.measure.datasource.connector.batch
import java.sql.DriverManager
import java.util.Properties
-import org.scalatest.Matchers
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
diff --git a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index a557dda..9fc9883 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -19,7 +19,7 @@ package org.apache.griffin.measure.job
import scala.util.{Failure, Success}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import org.apache.griffin.measure.Application._
@@ -30,12 +30,7 @@ import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
-class DQAppTest
- extends FlatSpec
- with SparkSuiteBase
- with BeforeAndAfterAll
- with Matchers
- with Loggable {
+class DQAppTest extends SparkSuiteBase with Matchers with Loggable {
var envParam: EnvConfig = _
var sparkParam: SparkParam = _
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index e4754e0..8bf81b1 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -57,7 +57,7 @@ class CustomSinkTest extends SinkTestBase {
}
sinks.headOption match {
case Some(sink: CustomSink) => sink.allMetrics
- case _ => mutable.ListBuffer[String]()
+ case _ => Map.empty
}
})
diff --git a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index 919183b..b13858f 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -19,14 +19,15 @@ package org.apache.griffin.measure.sink
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext}
-trait SinkTestBase extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+trait SinkTestBase extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
var sinkParams: Seq[SinkParam]
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index 834d8e0..5892c11 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -17,15 +17,17 @@
package org.apache.griffin.measure.step
-import org.scalatest._
import scala.util.Try
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+
import org.apache.griffin.measure.{Loggable, SparkSuiteBase}
import org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext}
import org.apache.griffin.measure.step.transform.TransformStep
-class TransformStepTest extends FlatSpec with Matchers with SparkSuiteBase with Loggable {
+class TransformStepTest extends AnyFlatSpec with Matchers with SparkSuiteBase with Loggable {
case class DualTransformStep(
name: String,
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
index 67e5236..f80fb13 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQStepsTest.scala
@@ -17,15 +17,15 @@
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.scalatest._
-import org.scalatest.mockito.MockitoSugar
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
+import org.scalatestplus.mockito.MockitoSugar
-import org.apache.griffin.measure.configuration.dqdefinition.RuleErrorConfParam
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam, RuleParam}
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.builder.dsl.expr.Expr
-class CompletenessExpr2DQStepsTest extends FlatSpec with Matchers with MockitoSugar {
+class CompletenessExpr2DQStepsTest extends AnyFlatSpec with Matchers with MockitoSugar {
"CompletenessExpr2DQSteps" should "get correct where clause" in {
val completeness = CompletenessExpr2DQSteps(mock[DQContext], mock[Expr], mock[RuleParam])
diff --git a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 2f06ad3..5b019a5 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -18,7 +18,8 @@
package org.apache.griffin.measure.transformations
import org.apache.spark.sql.DataFrame
-import org.scalatest._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition._
@@ -29,7 +30,10 @@ import org.apache.griffin.measure.job.builder.DQJobBuilder
case class AccuracyResult(total: Long, miss: Long, matched: Long, matchedFraction: Double)
-class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with SparkSuiteBase {
+class AccuracyTransformationsIntegrationTest
+ extends AnyFlatSpec
+ with Matchers
+ with SparkSuiteBase {
private val EMPTY_PERSON_TABLE = "empty_person"
private val PERSON_TABLE = "person"
@@ -80,10 +84,9 @@ class AccuracyTransformationsIntegrationTest extends FlatSpec with Matchers with
sourceName: String,
targetName: String,
expectedResult: AccuracyResult) = {
- val dqContext: DQContext = getDqContext(
- dataSourcesParam = List(
- DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
- DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
+ val dqContext: DQContext = getDqContext(dataSourcesParam = List(
+ DataSourceParam(name = "source", connector = dataConnectorParam(tableName = sourceName)),
+ DataSourceParam(name = "target", connector = dataConnectorParam(tableName = targetName))))
val accuracyRule = RuleParam(
dslType = "griffin-dsl",
diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
index 720f9b2..9f204a8 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/utils/ParamUtilTest.scala
@@ -17,11 +17,13 @@
package org.apache.griffin.measure.utils
-import org.scalatest._
+import org.scalatest.BeforeAndAfter
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should._
import org.apache.griffin.measure.utils.ParamUtil._
-class ParamUtilTest extends FlatSpec with Matchers with BeforeAndAfter {
+class ParamUtilTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val fruits: Map[String, Any] =
Map[String, Any]("A" -> "apple", "B" -> "banana", "O" -> "orange")
diff --git a/pom.xml b/pom.xml
index 964cf11..e51c94e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,13 +40,23 @@ under the License.
</prerequisites>
<properties>
+ <encoding>UTF-8</encoding>
+ <project.build.sourceEncoding>${encoding}</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>${encoding}</project.reporting.outputEncoding>
+
<java.version>1.8</java.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ <scala211.binary.version>2.11</scala211.binary.version>
+ <scala.version>${scala.binary.version}.0</scala.version>
+
+ <maven.compiler.source>${java.version}</maven.compiler.source>
+ <maven.compiler.target>${java.version}</maven.compiler.target>
+
<maven-apache-rat.version>0.11</maven-apache-rat.version>
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
+
+ <compile.scope>compile</compile.scope>
+ <provided.scope>provided</provided.scope>
</properties>
<licenses>
@@ -109,7 +119,7 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.1</version>
+ <version>3.8.1</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
@@ -119,28 +129,16 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.22.0</version>
+ <version>3.0.0-M3</version>
<configuration>
+ <testFailureIgnore>false</testFailureIgnore>
+ <useSystemClassLoader>true</useSystemClassLoader>
+ <forkMode>once</forkMode>
+ <failIfNoTests>false</failIfNoTests>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <!-- Scalatest runs all Scala tests -->
- <!-- enable scalatest -->
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <version>1.0</version>
- <configuration>
- <argLine>-ea -Xmx1g -Xss4m -XX:ReservedCodeCacheSize=128m</argLine>
+ <systemProperties>
+ <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+ </systemProperties>
</configuration>
<executions>
<execution>
@@ -151,6 +149,7 @@ under the License.
</execution>
</executions>
</plugin>
+
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
diff --git a/service/pom.xml b/service/pom.xml
index a1823a3..8121998 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -34,7 +34,6 @@ under the License.
<properties>
<hadoop.version>2.7.1</hadoop.version>
<hive.version>2.2.0</hive.version>
- <scala.version>2.10</scala.version>
<spring.boot.version>2.1.7.RELEASE</spring.boot.version>
<spring.security.kerberos.version>1.0.1.RELEASE</spring.security.kerberos.version>
<confluent.version>3.2.0</confluent.version>
@@ -258,7 +257,7 @@ under the License.
<!--livy-core-->
<dependency>
<groupId>com.cloudera.livy</groupId>
- <artifactId>livy-core_${scala.version}</artifactId>
+ <artifactId>livy-core_2.10</artifactId>
<version>${livy.core.version}</version>
</dependency>