You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/09/29 10:21:53 UTC
[3/3] incubator-griffin git commit: Fix case clauses
Fix case clauses
Author: William Guo <gu...@apache.org>
Closes #425 from guoyuepeng/fix_case_clauses.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18fc4cf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18fc4cf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18fc4cf4
Branch: refs/heads/master
Commit: 18fc4cf4cd706aa3e793fb0790c12ece5e5342a3
Parents: 485c5cf
Author: William Guo <gu...@apache.org>
Authored: Sat Sep 29 18:21:42 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Sat Sep 29 18:21:42 2018 +0800
----------------------------------------------------------------------
.../apache/griffin/measure/Application.scala | 36 ++-
.../configuration/dqdefinition/DQConfig.scala | 12 +-
.../configuration/dqdefinition/EnvConfig.scala | 5 +-
.../dqdefinition/reader/ParamFileReader.scala | 6 +-
.../dqdefinition/reader/ParamJsonReader.scala | 5 +-
.../dqdefinition/reader/ParamReader.scala | 5 +-
.../measure/configuration/enums/DqType.scala | 8 +-
.../configuration/enums/FlattenType.scala | 8 +-
.../configuration/enums/OutputType.scala | 8 +-
.../configuration/enums/ProcessType.scala | 6 +-
.../measure/configuration/enums/SinkType.scala | 19 +-
.../griffin/measure/context/DQContext.scala | 11 +-
.../measure/context/DataFrameCache.scala | 12 +-
.../griffin/measure/context/TableRegister.scala | 8 +-
.../checkpoint/lock/CheckpointLockInZK.scala | 6 +-
.../offset/OffsetCheckpointClient.scala | 2 +-
.../offset/OffsetCheckpointFactory.scala | 5 +-
.../offset/OffsetCheckpointInZK.scala | 29 ++-
.../streaming/metric/AccuracyMetric.scala | 8 +-
.../context/streaming/metric/CacheResults.scala | 6 +-
.../griffin/measure/datasource/DataSource.scala | 11 +-
.../measure/datasource/DataSourceFactory.scala | 8 +-
.../measure/datasource/TimestampStorage.scala | 29 ++-
.../datasource/cache/StreamingCacheClient.scala | 65 ++---
.../cache/StreamingCacheClientFactory.scala | 18 +-
.../cache/StreamingCacheJsonClient.scala | 3 +-
.../cache/StreamingCacheOrcClient.scala | 3 +-
.../cache/StreamingCacheParquetClient.scala | 10 +-
.../cache/StreamingOffsetCacheable.scala | 18 +-
.../measure/datasource/cache/WithFanIn.scala | 8 +-
.../datasource/connector/DataConnector.scala | 15 +-
.../connector/DataConnectorFactory.scala | 35 ++-
.../batch/AvroBatchDataConnector.scala | 6 +-
.../batch/HiveBatchDataConnector.scala | 6 +-
.../batch/TextDirBatchDataConnector.scala | 18 +-
.../streaming/KafkaStreamingDataConnector.scala | 6 +-
.../KafkaStreamingStringDataConnector.scala | 12 +-
.../streaming/StreamingDataConnector.scala | 10 +-
.../measure/job/builder/DQJobBuilder.scala | 2 -
.../apache/griffin/measure/launch/DQApp.scala | 3 +-
.../measure/launch/batch/BatchDQApp.scala | 10 +-
.../launch/streaming/StreamingDQApp.scala | 19 +-
.../griffin/measure/sink/ConsoleSink.scala | 3 +-
.../measure/sink/ElasticSearchSink.scala | 15 +-
.../apache/griffin/measure/sink/HdfsSink.scala | 17 +-
.../apache/griffin/measure/sink/MongoSink.scala | 10 +-
.../org/apache/griffin/measure/sink/Sink.scala | 3 +-
.../griffin/measure/sink/SinkFactory.scala | 3 +-
.../griffin/measure/sink/SinkTaskRunner.scala | 18 +-
.../measure/step/builder/DQStepBuilder.scala | 10 +-
.../step/builder/GriffinDslDQStepBuilder.scala | 4 +-
.../step/builder/RuleParamStepBuilder.scala | 4 +-
.../builder/dsl/expr/ClauseExpression.scala | 14 +-
.../step/builder/dsl/expr/LogicalExpr.scala | 5 +-
.../step/builder/dsl/expr/SelectExpr.scala | 8 +-
.../step/builder/dsl/expr/TreeNode.scala | 18 +-
.../step/builder/dsl/parser/BasicParser.scala | 6 +-
.../builder/dsl/parser/GriffinDslParser.scala | 5 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 55 +++--
.../transform/CompletenessExpr2DQSteps.scala | 40 +--
.../transform/DistinctnessExpr2DQSteps.scala | 48 ++--
.../builder/dsl/transform/Expr2DQSteps.scala | 2 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 16 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 46 ++--
.../dsl/transform/UniquenessExpr2DQSteps.scala | 34 ++-
.../transform/analyzer/AccuracyAnalyzer.scala | 6 +-
.../analyzer/CompletenessAnalyzer.scala | 3 +-
.../analyzer/DistinctnessAnalyzer.scala | 4 +-
.../transform/analyzer/ProfilingAnalyzer.scala | 3 +-
.../transform/analyzer/UniquenessAnalyzer.scala | 3 +-
.../builder/preproc/PreProcParamMaker.scala | 8 +-
.../griffin/measure/step/read/ReadStep.scala | 9 +-
.../measure/step/read/UnionReadStep.scala | 3 +-
.../measure/step/transform/DataFrameOps.scala | 16 +-
.../transform/DataFrameOpsTransformStep.scala | 7 +-
.../step/transform/SparkSqlTransformStep.scala | 3 +-
.../step/write/DataSourceUpdateWriteStep.scala | 19 +-
.../measure/step/write/MetricFlushStep.scala | 3 +-
.../measure/step/write/MetricWriteStep.scala | 17 +-
.../measure/step/write/RecordWriteStep.scala | 38 ++-
.../measure/step/write/SparkRowFormatter.scala | 6 +-
.../apache/griffin/measure/utils/FSUtil.scala | 14 +-
.../apache/griffin/measure/utils/HdfsUtil.scala | 30 +--
.../apache/griffin/measure/utils/HttpUtil.scala | 24 +-
.../apache/griffin/measure/utils/JsonUtil.scala | 5 +-
.../griffin/measure/utils/ParamUtil.scala | 8 +-
.../apache/griffin/measure/utils/TimeUtil.scala | 18 +-
scalastyle-config.xml | 246 +++++++++++++++++++
88 files changed, 887 insertions(+), 510 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index e7df806..1bac17b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -18,15 +18,16 @@ under the License.
*/
package org.apache.griffin.measure
-import org.apache.griffin.measure.configuration.enums._
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param}
import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.dqdefinition.{GriffinConfig, DQConfig, EnvConfig, Param}
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
/**
* application entrance
@@ -49,17 +50,15 @@ object Application extends Loggable {
// read param files
val envParam = readParamFile[EnvConfig](envParamFile) match {
case Success(p) => p
- case Failure(ex) => {
+ case Failure(ex) =>
error(ex.getMessage)
sys.exit(-2)
- }
}
val dqParam = readParamFile[DQConfig](dqParamFile) match {
case Success(p) => p
- case Failure(ex) => {
+ case Failure(ex) =>
error(ex.getMessage)
sys.exit(-2)
- }
}
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
@@ -68,32 +67,28 @@ object Application extends Loggable {
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
- case _ => {
+ case _ =>
error(s"${procType} is unsupported process type!")
sys.exit(-4)
- }
}
startup
// dq app init
dqApp.init match {
- case Success(_) => {
+ case Success(_) =>
info("process init success")
- }
- case Failure(ex) => {
+ case Failure(ex) =>
error(s"process init error: ${ex.getMessage}")
shutdown
sys.exit(-5)
- }
}
// dq app run
dqApp.run match {
- case Success(_) => {
+ case Success(_) =>
info("process run success")
- }
- case Failure(ex) => {
+ case Failure(ex) =>
error(s"process run error: ${ex.getMessage}")
if (dqApp.retryable) {
@@ -102,19 +97,16 @@ object Application extends Loggable {
shutdown
sys.exit(-5)
}
- }
}
// dq app end
dqApp.close match {
- case Success(_) => {
+ case Success(_) =>
info("process end success")
- }
- case Failure(ex) => {
+ case Failure(ex) =>
error(s"process end error: ${ex.getMessage}")
shutdown
sys.exit(-5)
- }
}
shutdown
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 4943329..b281481 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition
-import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
+
import org.apache.griffin.measure.configuration.enums._
/**
@@ -46,7 +47,7 @@ case class DQConfig(@JsonProperty("name") private val name: String,
def getDataSources: Seq[DataSourceParam] = {
dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) =>
val (seq, names) = ret
- if (!names.contains(ds.getName)){
+ if (!names.contains(ds.getName)) {
(seq :+ ds, names + ds.getName)
} else ret
}._1
@@ -133,8 +134,9 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul
* rule param
* @param dslType dsl type of this rule (must)
* @param dqType dq type of this rule (must if dsl type is "griffin-dsl")
- * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
- * @param outDfName name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix
+ * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name
+ * @param outDfName name of output dataframe of this rule, by default will be generated
+ * as data connector dataframe name with index suffix
* @param rule rule to define dq step calculation (must)
* @param details detail config of rule (optional)
* @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode)
@@ -206,4 +208,4 @@ case class RuleOutputParam( @JsonProperty("type") private val outputType: String
def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("")
def validate(): Unit = {}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index bf77d13..2e227a3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition
-import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
+
import org.apache.griffin.measure.configuration.enums._
/**
@@ -109,4 +110,4 @@ case class CheckpointParam(@JsonProperty("type") private val cpType: String,
def validate(): Unit = {
assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty")
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
index 4a4aeed..2739f74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala
@@ -18,11 +18,13 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.dqdefinition.Param
import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
-import scala.reflect.ClassTag
-import scala.util.Try
+
/**
* read params from config file path
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
index 063dc7e..ba51d5f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.dqdefinition.Param
import org.apache.griffin.measure.utils.JsonUtil
-import scala.reflect.ClassTag
-import scala.util.Try
/**
* read params from json string directly
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
index 9a9a46c..5c914c6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.configuration.dqdefinition.reader
+import scala.reflect.ClassTag
+import scala.util.Try
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.Param
-import scala.reflect.ClassTag
-import scala.util.Try
trait ParamReader extends Loggable with Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
index cee8d98..bbeb04f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -31,7 +31,13 @@ sealed trait DqType {
object DqType {
private val dqTypes: List[DqType] = List(
- AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType
+ AccuracyType,
+ ProfilingType,
+ UniquenessType,
+ DistinctnessType,
+ TimelinessType,
+ CompletenessType,
+ UnknownType
)
def apply(ptn: String): DqType = {
dqTypes.find(dqType => ptn match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
index 160ecaf..cb2fba1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -29,7 +29,13 @@ sealed trait FlattenType {
}
object FlattenType {
- private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType)
+ private val flattenTypes: List[FlattenType] = List(
+ DefaultFlattenType,
+ EntriesFlattenType,
+ ArrayFlattenType,
+ MapFlattenType
+ )
+
val default = DefaultFlattenType
def apply(ptn: String): FlattenType = {
flattenTypes.find(tp => ptn match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
index 5b1d261..8a80044 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -29,7 +29,13 @@ sealed trait OutputType {
}
object OutputType {
- private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType)
+ private val outputTypes: List[OutputType] = List(
+ MetricOutputType,
+ RecordOutputType,
+ DscUpdateOutputType,
+ UnknownOutputType
+ )
+
val default = UnknownOutputType
def apply(ptn: String): OutputType = {
outputTypes.find(tp => ptn match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
index 4f799ed..3cbc749 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -29,7 +29,11 @@ sealed trait ProcessType {
}
object ProcessType {
- private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType)
+ private val procTypes: List[ProcessType] = List(
+ BatchProcessType,
+ StreamingProcessType
+ )
+
def apply(ptn: String): ProcessType = {
procTypes.find(tp => ptn match {
case tp.idPattern() => true
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 5471e83..d9e5d2b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -30,15 +30,22 @@ sealed trait SinkType {
object SinkType {
private val sinkTypes: List[SinkType] = List(
- ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType
+ ConsoleSinkType,
+ HdfsSinkType,
+ ElasticsearchSinkType,
+ MongoSinkType,
+ UnknownSinkType
)
+
def apply(ptn: String): SinkType = {
sinkTypes.find(tp => ptn match {
case tp.idPattern() => true
case _ => false
}).getOrElse(UnknownSinkType)
}
+
def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+
def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
@@ -48,7 +55,7 @@ object SinkType {
/**
* console sink, will sink metric in console
*/
- case object ConsoleSinkType extends SinkType {
+case object ConsoleSinkType extends SinkType {
val idPattern = "^(?i)console|log$".r
val desc = "console"
}
@@ -56,7 +63,7 @@ object SinkType {
/**
* hdfs sink, will sink metric and record in hdfs
*/
- case object HdfsSinkType extends SinkType {
+case object HdfsSinkType extends SinkType {
val idPattern = "^(?i)hdfs$".r
val desc = "hdfs"
}
@@ -64,7 +71,7 @@ object SinkType {
/**
* elasticsearch sink, will sink metric in elasticsearch
*/
- case object ElasticsearchSinkType extends SinkType {
+case object ElasticsearchSinkType extends SinkType {
val idPattern = "^(?i)es|elasticsearch|http$".r
val desc = "elasticsearch"
}
@@ -72,12 +79,12 @@ object SinkType {
/**
* mongo sink, will sink metric in mongo db
*/
- case object MongoSinkType extends SinkType {
+case object MongoSinkType extends SinkType {
val idPattern = "^(?i)mongo|mongodb$".r
val desc = "distinct"
}
- case object UnknownSinkType extends SinkType {
+case object UnknownSinkType extends SinkType {
val idPattern = "".r
val desc = "unknown"
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index a9f3da0..b0759c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.spark.sql.{Encoders, SparkSession, SQLContext}
+
import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.datasource._
import org.apache.griffin.measure.sink.{Sink, SinkFactory}
-import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
/**
* dq context: the context of each calculation
@@ -58,6 +59,7 @@ case class DQContext(contextId: ContextId,
}
}
dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+
def getDataSourceName(index: Int): String = {
if (dataSourceNames.size > index) dataSourceNames(index) else ""
}
@@ -66,20 +68,25 @@ case class DQContext(contextId: ContextId,
val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
val dataSourceTimeRanges = loadDataSources()
+
def loadDataSources(): Map[String, TimeRange] = {
dataSources.map { ds =>
(ds.name, ds.loadData(this))
}.toMap
}
+
printTimeRanges
private val sinkFactory = SinkFactory(sinkParams, name)
private val defaultSink: Sink = createSink(contextId.timestamp)
+
def getSink(timestamp: Long): Sink = {
if (timestamp == contextId.timestamp) getSink()
else createSink(timestamp)
}
+
def getSink(): Sink = defaultSink
+
private def createSink(t: Long): Sink = {
procType match {
case BatchProcessType => sinkFactory.getSinks(t, true)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
index 4e8f4df..0671565 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
@@ -18,11 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Map => MutableMap, MutableList}
+
import org.apache.spark.sql.DataFrame
-import scala.collection.concurrent.{Map => ConcMap}
-import scala.collection.mutable.{MutableList, Map => MutableMap}
+import org.apache.griffin.measure.Loggable
/**
* cache and unpersist dataframes
@@ -42,17 +42,15 @@ case class DataFrameCache() extends Loggable {
def cacheDataFrame(name: String, df: DataFrame): Unit = {
info(s"try to cache data frame ${name}")
dataFrames.get(name) match {
- case Some(odf) => {
+ case Some(odf) =>
trashDataFrame(odf)
dataFrames += (name -> df)
df.cache
info("cache after replace old df")
- }
- case _ => {
+ case _ =>
dataFrames += (name -> df)
df.cache
info("cache after replace no old df")
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
index 8d86170..c4dda3b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -18,10 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.context
-import org.apache.griffin.measure.Loggable
+import scala.collection.mutable.{Set => MutableSet}
+
import org.apache.spark.sql._
-import scala.collection.mutable.{Set => MutableSet}
+import org.apache.griffin.measure.Loggable
+
/**
* register table name
@@ -78,4 +80,4 @@ case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends Table
tables.clear
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
index b1cbe0f..d78aedf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
@@ -32,10 +32,9 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
mutex.acquire(-1, null)
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"lock error: ${e.getMessage}")
false
- }
}
}
@@ -44,9 +43,8 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check
try {
if (mutex.isAcquiredInThisProcess) mutex.release
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"unlock error: ${e.getMessage}")
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
index 8acfbeb..48337d5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointL
object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps {
var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil
- def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) = {
+ def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) : Unit = {
val fac = OffsetCheckpointFactory(checkpointParams, metricName)
offsetCheckpoints = checkpointParams.flatMap(param => fac.getOffsetCheckpoint(param)).toList
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
index 5fe8e15..f19542a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.context.streaming.checkpoint.offset
+import scala.util.Try
+
import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
-import scala.util.Try
case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], metricName: String
) extends Serializable {
@@ -36,4 +37,4 @@ case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam],
offsetCheckpointTry.toOption
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
index e051779..b575573 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
@@ -18,22 +18,24 @@ under the License.
*/
package org.apache.griffin.measure.context.streaming.checkpoint.offset
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
import org.apache.zookeeper.CreateMode
-
import scala.collection.JavaConverters._
+import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
+
+
/**
* leverage zookeeper for info cache
* @param config
* @param metricName
*/
-case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) extends OffsetCheckpoint with OffsetOps {
+case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String)
+ extends OffsetCheckpoint with OffsetOps {
val Hosts = "hosts"
val Namespace = "namespace"
@@ -67,7 +69,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
}
val lockPath = config.getOrElse(LockPath, "lock").toString
- private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+ private val cacheNamespace: String =
+ if (namespace.isEmpty) metricName else namespace + separator + metricName
+
private val builder = CuratorFrameworkFactory.builder()
.connectString(hosts)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
@@ -141,10 +145,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
try {
client.getChildren().forPath(path).asScala.toList
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"list ${path} warn: ${e.getMessage}")
Nil
- }
}
}
@@ -162,10 +165,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
.forPath(path, content.getBytes("utf-8"))
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
false
- }
}
}
@@ -174,10 +176,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
client.setData().forPath(path, content.getBytes("utf-8"))
true
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
false
- }
}
}
@@ -185,10 +186,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
try {
Some(new String(client.getData().forPath(path), "utf-8"))
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"read ${path} warn: ${e.getMessage}")
None
- }
}
}
@@ -204,10 +204,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex
try {
client.checkExists().forPath(path) != null
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"check exists ${path} warn: ${e.getMessage}")
false
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
index e69716e..19dfb9e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
@@ -45,9 +45,11 @@ case class AccuracyMetric(miss: Long, total: Long) extends Metric {
(this.miss != other.miss) || (this.total != other.total)
}
- def getMiss = miss
- def getTotal = total
- def getMatch = total - miss
+ def getMiss: Long = miss
+
+ def getTotal: Long = total
+
+ def getMatch: Long = total - miss
def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
index cc8e772..6c99618 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.context.streaming.metric
+import scala.collection.mutable.{Map => MutableMap}
+
import org.apache.griffin.measure.Loggable
-import scala.collection.mutable.{Map => MutableMap}
/**
* in streaming mode, some metrics may update,
@@ -32,10 +33,9 @@ object CacheResults extends Loggable {
def olderThan(ut: Long): Boolean = updateTime < ut
def update(ut: Long, r: Metric): Option[Metric] = {
r match {
- case m: result.T if (olderThan(ut)) => {
+ case m: result.T if (olderThan(ut)) =>
val ur = result.update(m)
if (result.differsFrom(ur)) Some(ur) else None
- }
case _ => None
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 6bf6373..f2cd0ec 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -18,13 +18,14 @@ under the License.
*/
package org.apache.griffin.measure.datasource
+import org.apache.spark.sql._
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.context.{DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.DataConnector
import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.spark.sql._
/**
* data source
@@ -50,12 +51,10 @@ case class DataSource(name: String,
val timestamp = context.contextId.timestamp
val (dfOpt, timeRange) = data(timestamp)
dfOpt match {
- case Some(df) => {
+ case Some(df) =>
context.runTimeTableRegister.registerTable(name, df)
- }
- case None => {
+ case None =>
warn(s"load data source [${name}] fails")
- }
}
timeRange
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index 7807dfd..28e616b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -18,14 +18,16 @@ under the License.
*/
package org.apache.griffin.measure.datasource
+import scala.util.Success
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam
import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-import scala.util.Success
object DataSourceFactory extends Loggable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
index a305563..04a7f85 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
@@ -18,10 +18,9 @@ under the License.
*/
package org.apache.griffin.measure.datasource
-import org.apache.griffin.measure.Loggable
-
import scala.collection.mutable.{SortedSet => MutableSortedSet}
+import org.apache.griffin.measure.Loggable
/**
* tmst cache, CRUD of timestamps
*/
@@ -29,19 +28,19 @@ case class TimestampStorage() extends Loggable {
private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
- //-- insert tmst into tmst group --
- def insert(tmst: Long) = tmstGroup += tmst
- def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+ // -- insert tmst into tmst group --
+ def insert(tmst: Long) : MutableSortedSet[Long] = tmstGroup += tmst
+ def insert(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup ++= tmsts
- //-- remove tmst from tmst group --
- def remove(tmst: Long) = tmstGroup -= tmst
- def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+ // -- remove tmst from tmst group --
+ def remove(tmst: Long) : MutableSortedSet[Long] = tmstGroup -= tmst
+ def remove(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup --= tmsts
- //-- get subset of tmst group --
- def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
- def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
- def until(until: Long) = tmstGroup.until(until).toSet
- def from(from: Long) = tmstGroup.from(from).toSet
- def all = tmstGroup.toSet
+ // -- get subset of tmst group --
+ def fromUntil(from: Long, until: Long) : Set[Long] = tmstGroup.range(from, until).toSet
+ def afterTil(after: Long, til: Long) : Set[Long] = tmstGroup.range(after + 1, til + 1).toSet
+ def until(until: Long) : Set[Long] = tmstGroup.until(until).toSet
+ def from(from: Long) : Set[Long] = tmstGroup.from(from).toSet
+ def all : Set[Long] = tmstGroup.toSet
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index b351f82..a03a468 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.datasource.cache
import java.util.concurrent.TimeUnit
+import scala.util.Random
+
+import org.apache.spark.sql._
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
import org.apache.griffin.measure.utils.DataFrameUtil._
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
-import scala.util.Random
/**
* data source cache in streaming mode
@@ -38,7 +40,8 @@ import scala.util.Random
* read data frame from hdfs in calculate phase
* with update and clean actions for the cache data
*/
-trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
+trait StreamingCacheClient
+ extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
val sqlContext: SQLContext
val param: Map[String, Any]
@@ -46,7 +49,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val index: Int
val timestampStorage: TimestampStorage
- protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until)
+ protected def fromUntilRangeTmsts(from: Long, until: Long) =
+ timestampStorage.fromUntil(from, until)
+
protected def clearTmst(t: Long) = timestampStorage.remove(t)
protected def clearTmstsUntil(until: Long) = {
val outDateTmsts = timestampStorage.until(until)
@@ -67,17 +72,20 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val filePath: String = param.getString(_FilePath, defFilePath)
val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
- val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
- val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+ val readyTimeInterval: Long =
+ TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+
+ val readyTimeDelay: Long =
+ TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+
val deltaTimeRange: (Long, Long) = {
def negative(n: Long): Long = if (n <= 0) n else 0
param.get(_TimeRange) match {
- case Some(seq: Seq[String]) => {
+ case Some(seq: Seq[String]) =>
val nseq = seq.flatMap(TimeUtil.milliseconds(_))
val ns = negative(nseq.headOption.getOrElse(0))
val ne = negative(nseq.tail.headOption.getOrElse(0))
(ns, ne)
- }
case _ => (0, 0)
}
}
@@ -112,7 +120,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
if (!readOnly) {
dfOpt match {
- case Some(df) => {
+ case Some(df) =>
// cache df
df.cache
@@ -137,10 +145,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
// uncache df
df.unpersist
- }
- case _ => {
+ case _ =>
info("no data frame to save")
- }
}
// submit cache time and ready time
@@ -168,7 +174,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
} else {
info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
- s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+
+ s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " +
+ s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
}
// new cache data
@@ -176,10 +184,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val dfr = sqlContext.read
readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr))
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"read data source cache warn: ${e.getMessage}")
None
- }
}
// old cache data
@@ -190,10 +197,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val dfr = sqlContext.read
readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr))
} catch {
- case e: Throwable => {
+ case e: Throwable =>
warn(s"read old data source cache warn: ${e.getMessage}")
None
- }
}
}
@@ -228,12 +234,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
}
names.filter { name =>
name match {
- case regex(value) => {
+ case regex(value) =>
str2Long(value) match {
case Some(t) => func(t, bound)
case _ => false
}
- }
case _ => false
}
}.map(name => s"${path}/${name}")
@@ -258,7 +263,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
// new cache data
val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
newCacheCleanTime match {
- case Some(nct) => {
+ case Some(nct) =>
// clean calculated new cache data
val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
if (newCacheLocked) {
@@ -271,16 +276,15 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
newCacheLock.unlock()
}
}
- }
- case _ => {
+ case _ =>
// do nothing
- }
+ info("should not happen")
}
// old cache data
val oldCacheCleanTime = if (updatable) readCleanTime else None
oldCacheCleanTime match {
- case Some(oct) => {
+ case Some(oct) =>
val oldCacheIndexOpt = readOldCacheIndex
oldCacheIndexOpt.foreach { idx =>
val oldDfPath = s"${oldFilePath}/${idx}"
@@ -298,10 +302,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
}
}
}
- }
- case _ => {
+ case _ =>
// do nothing
- }
+ info("should not happen")
}
}
}
@@ -313,7 +316,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
def updateData(dfOpt: Option[DataFrame]): Unit = {
if (!readOnly && updatable) {
dfOpt match {
- case Some(df) => {
+ case Some(df) =>
// old cache lock
val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
if (oldCacheLocked) {
@@ -339,10 +342,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
oldCacheLock.unlock()
}
}
- }
- case _ => {
+ case _ =>
info("no data frame to update")
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index f991e2d..eeda8ef 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
+import org.apache.spark.sql.SQLContext
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
object StreamingCacheClientFactory extends Loggable {
@@ -50,17 +51,20 @@ object StreamingCacheClientFactory extends Loggable {
try {
val tp = param.getString(_type, "")
val dsCache = tp match {
- case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
- case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
- case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
- case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ case ParquetRegex() =>
+ StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ case JsonRegex() =>
+ StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+ case OrcRegex() =>
+ StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+ case _ =>
+ StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
}
Some(dsCache)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error("generate data source cache fails")
None
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
index a12ef87..c81d4d1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.spark.sql._
+import org.apache.griffin.measure.datasource.TimestampStorage
+
/**
* data source cache in json format
*/
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
index 63e7104..0649b74 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.spark.sql._
+import org.apache.griffin.measure.datasource.TimestampStorage
+
/**
* data source cache in orc format
*/
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
index c275227..9c369ee 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -18,14 +18,18 @@ under the License.
*/
package org.apache.griffin.measure.datasource.cache
-import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.spark.sql._
+import org.apache.griffin.measure.datasource.TimestampStorage
+
/**
* data source cache in parquet format
*/
-case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, timestampStorage: TimestampStorage
+case class StreamingCacheParquetClient(sqlContext: SQLContext,
+ param: Map[String, Any],
+ dsName: String,
+ index: Int,
+ timestampStorage: TimestampStorage
) extends StreamingCacheClient {
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
index 7b7f506..e73a058 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -30,13 +30,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
val readyTimeInterval: Long
val readyTimeDelay: Long
- def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
+ def selfCacheInfoPath : String = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
- def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
- def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
- def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
- def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
- def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
+ def selfCacheTime : String = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
+ def selfLastProcTime : String = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
+ def selfReadyTime : String = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
+ def selfCleanTime : String = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
+ def selfOldCacheIndex : String = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
protected def submitCacheTime(ms: Long): Unit = {
val map = Map[String, String]((selfCacheTime -> ms.toString))
@@ -80,9 +80,11 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
try {
Some(v.toLong)
} catch {
- case _:Throwable => error("try to read not existing value from OffsetCacheClient::readSelfInfo");None
+ case _: Throwable =>
+ error("try to read not existing value from OffsetCacheClient::readSelfInfo")
+ None
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
index 413b5a2..675f2f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.datasource.cache
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+import scala.collection.concurrent.{Map => ConcMap, TrieMap}
/**
* fan in trait, for multiple input and one output
@@ -55,14 +55,12 @@ trait WithFanIn[T] {
private def fanInc(key: T): Unit = {
fanInCountMap.get(key) match {
- case Some(n) => {
+ case Some(n) =>
val suc = fanInCountMap.replace(key, n, n + 1)
if (!suc) fanInc(key)
- }
- case _ => {
+ case _ =>
val oldOpt = fanInCountMap.putIfAbsent(key, 1)
if (oldOpt.nonEmpty) fanInc(key)
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index 05d3c75..ae6a18d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -20,16 +20,17 @@ package org.apache.griffin.measure.datasource.connector
import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
trait DataConnector extends Loggable with Serializable {
@@ -64,7 +65,8 @@ trait DataConnector extends Loggable with Serializable {
saveTmst(timestamp) // save timestamp
dfOpt.flatMap { df =>
- val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
+ val (preProcRules, thisTable) =
+ PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
// init data
context.compileTableRegister.registerTable(thisTable)
@@ -89,10 +91,9 @@ trait DataConnector extends Loggable with Serializable {
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
None
- }
}
}
}
@@ -108,4 +109,4 @@ object DataConnectorIdGenerator {
private def increment: Long = {
counter.incrementAndGet()
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
index f4911fc..b51d4fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -18,17 +18,18 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector
+import scala.util.Try
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.datasource.connector.batch._
import org.apache.griffin.measure.datasource.connector.streaming._
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-import scala.reflect.ClassTag
-import scala.util.Try
object DataConnectorFactory extends Loggable {
@@ -60,9 +61,8 @@ object DataConnectorFactory extends Loggable {
case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
- case KafkaRegex() => {
+ case KafkaRegex() =>
getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- }
case _ => throw new Exception("connector creation error!")
}
}
@@ -78,7 +78,8 @@ object DataConnectorFactory extends Loggable {
val conType = dcParam.getType
val version = dcParam.getVersion
conType match {
- case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ case KafkaRegex() =>
+ getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ => throw new Exception("streaming connector creation error!")
}
}
@@ -88,7 +89,7 @@ object DataConnectorFactory extends Loggable {
dcParam: DataConnectorParam,
tmstCache: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]
- ): KafkaStreamingDataConnector = {
+ ): KafkaStreamingDataConnector = {
val KeyType = "key.type"
val ValueType = "value.type"
val config = dcParam.getConfig
@@ -96,22 +97,14 @@ object DataConnectorFactory extends Loggable {
val valueType = config.getOrElse(ValueType, "java.lang.String").toString
(keyType, valueType) match {
- case ("java.lang.String", "java.lang.String") => {
- KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- }
- case _ => {
+ case ("java.lang.String", "java.lang.String") =>
+ KafkaStreamingStringDataConnector(
+ sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ case _ =>
throw new Exception("not supported type kafka data connector")
- }
}
}
-// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
-// connectors.flatMap { dc =>
-// dc match {
-// case mdc: T => Some(mdc)
-// case _ => None
-// }
-// }
-// }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 7fa9080..bf71b2c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -58,10 +59,9 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"load avro file ${concreteFileFullPath} fails")
None
- }
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
index ab1e823..1df3bd7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -54,10 +55,9 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
None
- }
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
index 3dcab16..a7ab02e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -18,11 +18,12 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.batch
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.TimeRange
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -60,7 +61,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
if (validDataDirs.nonEmpty) {
- val df = sparkSession.read.text(validDataDirs: _*)
+ val df = sparkSession.read.text(validDataDirs: _*)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
@@ -68,16 +69,17 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
None
}
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"load text dir ${dirPath} fails: ${e.getMessage}")
None
- }
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}
- private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
+ private def listSubDirs(paths: Seq[String],
+ depth: Int,
+ filteFunc: (String) => Boolean): Seq[String] = {
val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
if (depth <= 0) {
subDirs.filter(filteFunc)
@@ -90,7 +92,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
- private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+ private def touchDone(dir: String): Unit =
+ HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
private def emptyDir(dir: String): Boolean = {
HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
@@ -98,7 +101,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
// def metaData(): Try[Iterable[(String, String)]] = {
// Try {
-// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+// val st = sqlContext.read.format("com.databricks.spark.avro").
+ // load(concreteFileFullPath).schema
// st.fields.map(f => (f.name, f.dataType.typeName))
// }
// }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 1475898..ec09ffc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.streaming
+import scala.util.{Failure, Success, Try}
+
import kafka.serializer.Decoder
import org.apache.spark.streaming.dstream.InputDStream
-import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -64,10 +65,9 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
// pre-process
preProcess(dfOpt, ms)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"streaming data connector error: ${e.getMessage}")
None
- }
}
// save data frame
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
index 8477fcf..ee5e497 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -19,16 +19,17 @@ under the License.
package org.apache.griffin.measure.datasource.connector.streaming
import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.datasource.TimestampStorage
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+
/**
* streaming data connector for kafka with string format key and value
*/
@@ -60,10 +61,9 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi
val df = sparkSession.createDataFrame(rowRdd, schema)
Some(df)
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error("streaming data transform fails")
None
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
index 5c2170c..323b0ac 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
@@ -18,14 +18,16 @@ under the License.
*/
package org.apache.griffin.measure.datasource.connector.streaming
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.datasource.connector.DataConnector
+import scala.util.Try
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.streaming.dstream.InputDStream
-import scala.util.Try
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnector
+
trait StreamingDataConnector extends DataConnector {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
index 249d6d2..0917919 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -18,11 +18,9 @@ under the License.
*/
package org.apache.griffin.measure.job.builder
-import org.apache.griffin.measure.configuration.enums.DslType
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.job._
-import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.DQStepBuilder
import org.apache.griffin.measure.step.write.MetricFlushStep
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index b6cca98..92480d8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -18,10 +18,11 @@ under the License.
*/
package org.apache.griffin.measure.launch
+import scala.util.Try
+
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
-import scala.util.Try
/**
* dq application process
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index ba1f389..e2dbc8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -20,17 +20,19 @@ package org.apache.griffin.measure.launch.batch
import java.util.Date
-import org.apache.griffin.measure.configuration.enums._
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+
import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import scala.util.Try
case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index ceecb78..eb31a5e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -21,22 +21,24 @@ package org.apache.griffin.measure.launch.streaming
import java.util.{Date, Timer, TimerTask}
import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition._
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
-import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import scala.util.Try
case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
@@ -82,10 +84,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
try {
createStreamingContext
} catch {
- case e: Throwable => {
+ case e: Throwable =>
error(s"create streaming context error: ${e.getMessage}")
throw e
- }
}
})
@@ -118,7 +119,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
ssc.start()
ssc.awaitTermination()
- ssc.stop(stopSparkContext=true, stopGracefully=true)
+ ssc.stop(stopSparkContext = true, stopGracefully = true)
// clean context
globalContext.clean()
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 306befe..feebd91 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -18,9 +18,10 @@ under the License.
*/
package org.apache.griffin.measure.sink
+import org.apache.spark.rdd.RDD
+
import org.apache.griffin.measure.utils.JsonUtil
import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.rdd.RDD
/**
* sink metric and record to console, for debug