You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/07/08 13:20:14 UTC
[griffin] branch master updated: build and run transform steps with
multiple threads
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 62d755d build and run transform steps with multiple threads
62d755d is described below
commit 62d755d6f0474d4492d598d469e02c891d99bf95
Author: wankunde <wa...@163.com>
AuthorDate: Mon Jul 8 21:19:55 2019 +0800
build and run transform steps with multiple threads
There are sequence of dq steps in each DQJob,and run those steps one by one (with foldLeft function).
We can use multiple threads to run some of those steps which have no dependency.
For example:
In a DQBatchJob, a accuracyExpr will have for steps **_missRecords ,_missCount , __totalCount, accu** .
_missCount and **_totalCount step can run at the same time .
In SeqDQStep ,it just need contains some root steps without dependency steps.
If each step knows it's dependency steps, and when they are ready, we can run the step itself .
Running step :
accu
| |---__missCount
| | |---__missRecords
| |---__totalCount
Running step :
__missCount
| |---__missRecords
Running step :
__missRecords
Running step :
__totalCount
Author: wankunde <wa...@163.com>
Closes #504 from wankunde/master.
---
.../org/apache/griffin/measure/step/DQStep.scala | 9 +
.../dsl/transform/AccuracyExpr2DQSteps.scala | 22 +-
.../dsl/transform/CompletenessExpr2DQSteps.scala | 15 +-
.../dsl/transform/DistinctnessExpr2DQSteps.scala | 28 ++-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 7 +-
.../dsl/transform/UniquenessExpr2DQSteps.scala | 13 +-
.../step/transform/DataFrameOpsTransformStep.scala | 2 +-
.../step/transform/SparkSqlTransformStep.scala | 2 +-
.../measure/step/transform/TransformStep.scala | 79 +++++++
.../apache/griffin/measure/utils/ThreadUtils.scala | 227 +++++++++++++++++++++
.../griffin/measure/step/TransformStepTest.scala | 91 +++++++++
11 files changed, 457 insertions(+), 38 deletions(-)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
index 60c8477..6a50ebb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -33,3 +33,12 @@ trait DQStep extends Loggable {
def getNames(): Seq[String] = name :: Nil
}
+
+object DQStepStatus extends Enumeration {
+ val PENDING = Value
+ val RUNNING = Value
+ val COMPLETE = Value
+ val FAILED = Value
+}
+
+
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index f7ff3ef..31eef69 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -111,6 +111,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
}
val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
+ missCountTransStep.parentSteps += missRecordsTransStep
// 3. total count
val totalCountTableName = "__totalCount"
@@ -151,6 +152,8 @@ case class AccuracyExpr2DQSteps(context: DQContext,
""".stripMargin
}
val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
+ accuracyTransStep.parentSteps += missCountTransStep
+ accuracyTransStep.parentSteps += totalCountTransStep
val accuracyMetricWriteSteps = procType match {
case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
@@ -160,14 +163,12 @@ case class AccuracyExpr2DQSteps(context: DQContext,
case StreamingProcessType => Nil
}
- // accuracy current steps
- val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil
- val writeSteps1 =
+ val batchWriteSteps =
accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
- // streaming extra steps
- val (transSteps2, writeSteps2) = procType match {
- case BatchProcessType => (Nil, Nil)
+ procType match {
+ case BatchProcessType => accuracyTransStep :: batchWriteSteps
+ // streaming extra steps
case StreamingProcessType =>
// 5. accuracy metric merge
val accuracyMetricTableName = "__accuracy"
@@ -179,6 +180,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
)
val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
+ accuracyMetricTransStep.parentSteps += accuracyTransStep
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
@@ -196,6 +198,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
}
val accuracyRecordTransStep = SparkSqlTransformStep(
accuracyRecordTableName, accuracyRecordSql, emptyMap)
+ accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
val accuracyRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -205,12 +208,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
}
// extra steps
- (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
- accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
+ val streamingWriteSteps = accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil
+ accuracyRecordTransStep :: batchWriteSteps ++ streamingWriteSteps
}
-
- // full steps
- transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
}
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 87cfa86..3df4a12 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -93,6 +93,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val incompleteRecordTransStep =
SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
+ incompleteRecordTransStep.parentSteps += sourceAliasTransStep
val incompleteRecordWriteStep = {
val rwName =
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -112,6 +113,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
}
val incompleteCountTransStep =
SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+ incompleteCountTransStep.parentSteps += incompleteRecordTransStep
// 4. total count
val totalCountTableName = "__totalCount"
@@ -124,6 +126,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
s"FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
}
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
+ totalCountTransStep.parentSteps += sourceAliasTransStep
// 5. complete metric
val completeTableName = ruleParam.getOutDfName()
@@ -147,6 +150,8 @@ case class CompletenessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
+ completeTransStep.parentSteps += incompleteCountTransStep
+ completeTransStep.parentSteps += totalCountTransStep
val completeWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
@@ -154,14 +159,8 @@ case class CompletenessExpr2DQSteps(context: DQContext,
MetricWriteStep(mwName, completeTableName, flattenType)
}
- val transSteps = {
- sourceAliasTransStep :: incompleteRecordTransStep ::
- incompleteCountTransStep :: totalCountTransStep ::
- completeTransStep :: Nil
- }
- val writeSteps = {
- incompleteRecordWriteStep :: completeWriteStep :: Nil
- }
+ val transSteps = completeTransStep :: Nil
+ val writeSteps = incompleteRecordWriteStep :: completeWriteStep :: Nil
// full steps
transSteps ++ writeSteps
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 70fee6c..0e2b10e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -111,6 +111,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
}
val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
+ totalTransStep.parentSteps += sourceAliasTransStep
val totalMetricWriteStep = {
MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
}
@@ -128,8 +129,9 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val selfGroupTransStep =
SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+ selfGroupTransStep.parentSteps += sourceAliasTransStep
- val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil
+ val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
val writeSteps1 = totalMetricWriteStep :: Nil
val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
@@ -163,6 +165,8 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
+ joinedTransStep.parentSteps += selfGroupTransStep
+ joinedTransStep.parentSteps += olderAliasTransStep
// 6. group by joined data
val groupTableName = "__group"
@@ -176,6 +180,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap)
+ groupTransStep.parentSteps += joinedTransStep
// 7. final duplicate count
val finalDupCountTableName = "__finalDupCount"
@@ -204,12 +209,13 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val finalDupCountTransStep =
SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+ finalDupCountTransStep.parentSteps += groupTransStep
- ((olderAliasTransStep :: joinedTransStep
- :: groupTransStep :: finalDupCountTransStep :: Nil,
- targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
+ ((finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil),
+ finalDupCountTableName)
case _ =>
- ((Nil, Nil), selfGroupTableName)
+ ((selfGroupTransStep :: Nil, totalMetricWriteStep :: Nil),
+ selfGroupTableName)
}
// 8. distinct metric
@@ -262,6 +268,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val rnTransStep = SparkSqlTransformStep(rnTableName, rnSql, emptyMap)
+ rnTransStep.parentSteps += informedTransStep
// 11. recognize duplicate items
val dupItemsTableName = "__dupItems"
@@ -272,6 +279,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
+ dupItemsTransStep.parentSteps += rnTransStep
val dupItemsWriteStep = {
val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
@@ -289,6 +297,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val groupDupMetricTransStep =
SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+ groupDupMetricTransStep.parentSteps += dupItemsTransStep
val groupDupMetricWriteStep = {
MetricWriteStep(duplicationArrayName,
groupDupMetricTableName,
@@ -296,9 +305,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
writeTimestampOpt)
}
- val msteps = {
- informedTransStep :: rnTransStep :: dupItemsTransStep :: groupDupMetricTransStep :: Nil
- }
+ val msteps = groupDupMetricTransStep :: Nil
val wsteps = if (recordEnable) {
dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
} else {
@@ -344,6 +351,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
+ dupMetricTransStep.parentSteps += dupRecordTransStep
val dupMetricWriteStep = {
MetricWriteStep(
duplicationArrayName,
@@ -353,9 +361,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
)
}
- val msteps = {
- dupRecordTransStep :: dupMetricTransStep :: Nil
- }
+ val msteps = dupMetricTransStep :: Nil
val wsteps = if (recordEnable) {
dupRecordWriteStep :: dupMetricWriteStep :: Nil
} else {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 71eb452..03c2c8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -105,6 +105,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
s"FROM `${inTimeTableName}`"
}
val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
+ latencyTransStep.parentSteps += inTimeTransStep
// 3. timeliness metric
val metricTableName = ruleParam.getOutDfName()
@@ -129,6 +130,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
+ metricTransStep.parentSteps += latencyTransStep
val metricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
@@ -137,7 +139,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
// current steps
- val transSteps1 = inTimeTransStep :: latencyTransStep :: metricTransStep :: Nil
+ val transSteps1 = metricTransStep :: Nil
val writeSteps1 = metricWriteStep :: Nil
// 4. timeliness record
@@ -190,11 +192,12 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val rangeMetricTransStep =
SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+ rangeMetricTransStep.parentSteps += rangeTransStep
val rangeMetricWriteStep = {
MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
}
- (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
+ (rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
case _ => (Nil, Nil)
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 28e9d48..7f259ea 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -64,7 +64,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
warn(s"[${timestamp}] data source ${sourceName} not exists")
Nil
} else if (!context.runTimeTableRegister.existsTable(targetName)) {
- println(s"[${timestamp}] data source ${targetName} not exists")
+ warn(s"[${timestamp}] data source ${targetName} not exists")
Nil
} else {
val selItemsClause = analyzer.selectionPairs.map { pair =>
@@ -104,6 +104,8 @@ case class UniquenessExpr2DQSteps(context: DQContext,
s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}"
}
val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
+ joinedTransStep.parentSteps += sourceTransStep
+ joinedTransStep.parentSteps += targetTransStep
// 4. group
val groupTableName = "__group"
@@ -116,6 +118,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
}
val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
+ groupTransStep.parentSteps += joinedTransStep
// 5. total metric
val totalTableName = "__totalMetric"
@@ -138,6 +141,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}
val uniqueRecordTransStep =
SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
+ uniqueRecordTransStep.parentSteps += groupTransStep
// 7. unique metric
val uniqueTableName = "__uniqueMetric"
@@ -152,12 +156,12 @@ case class UniquenessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
+ uniqueTransStep.parentSteps += uniqueRecordTransStep
val uniqueMetricWriteStep =
MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
- val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
- totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
+ val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
val duplicationArrayName = details.getString(_duplicationArray, "")
@@ -198,11 +202,12 @@ case class UniquenessExpr2DQSteps(context: DQContext,
""".stripMargin
}
val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
+ dupMetricTransStep.parentSteps += dupRecordTransStep
val dupMetricWriteStep = {
MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
}
- (dupRecordTransStep :: dupMetricTransStep :: Nil,
+ (dupMetricTransStep :: Nil,
dupRecordWriteStep :: dupMetricWriteStep :: Nil)
} else (Nil, Nil)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 4ac35b2..b07595a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -30,7 +30,7 @@ case class DataFrameOpsTransformStep(name: String,
cache: Boolean = false
) extends TransformStep {
- def execute(context: DQContext): Boolean = {
+ def doExecute(context: DQContext): Boolean = {
val sqlContext = context.sqlContext
try {
val df = rule match {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 39b6a0e..59ea822 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -29,7 +29,7 @@ case class SparkSqlTransformStep(name: String,
cache: Boolean = false
) extends TransformStep {
- def execute(context: DQContext): Boolean = {
+ def doExecute(context: DQContext): Boolean = {
val sqlContext = context.sqlContext
try {
val df = sqlContext.sql(rule)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
index 995ce49..8c094b2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -18,7 +18,15 @@ under the License.
*/
package org.apache.griffin.measure.step.transform
+import scala.collection.mutable.HashSet
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.DQStepStatus._
+import org.apache.griffin.measure.utils.ThreadUtils
trait TransformStep extends DQStep {
@@ -28,4 +36,75 @@ trait TransformStep extends DQStep {
val cache: Boolean
+ var status = PENDING
+
+ val parentSteps = new HashSet[TransformStep]
+
+ def doExecute(context: DQContext): Boolean
+
+ def execute(context: DQContext): Boolean = {
+ val threadName = Thread.currentThread().getName
+ info(threadName + " bigin transform step : \n" + debugString())
+ // Submit parents Steps
+ val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { parentStep =>
+ Future {
+ val result = parentStep.execute(context)
+ parentStep.synchronized {
+ if (result) {
+ parentStep.status = COMPLETE
+ } else {
+ parentStep.status = FAILED
+ }
+ }
+ }(TransformStep.transformStepContext)
+ }
+ ThreadUtils.awaitResult(
+ Future.sequence(parentStepFutures)(implicitly, TransformStep.transformStepContext),
+ Duration.Inf)
+
+ parentSteps.map(step => {
+ while (step.status == RUNNING) {
+ Thread.sleep(1000L)
+ }
+ })
+ val prepared = parentSteps.foldLeft(true)((ret, step) => ret && step.status == COMPLETE)
+ if (prepared) {
+ val res = doExecute(context)
+ info(threadName + " end transform step : \n" + debugString())
+ res
+ } else {
+ error("Parent transform step failed!")
+ false
+ }
+ }
+
+ def checkAndUpdateStatus(step: TransformStep): Boolean = {
+ step.synchronized {
+ if (step.status == PENDING) {
+ step.status = RUNNING
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ def debugString(level: Int = 0): String = {
+ val stringBuffer = new StringBuilder
+ if (level > 0) {
+ for (i <- 0 to level - 1) {
+ stringBuffer.append("| ")
+ }
+ stringBuffer.append("|---")
+ }
+ stringBuffer.append(name + "\n")
+ parentSteps.foreach(parentStep => stringBuffer.append(parentStep.debugString(level + 1)))
+ stringBuffer.toString()
+ }
}
+
+object TransformStep {
+ private[transform] val transformStepContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonCachedThreadPool("transform-step"))
+}
+
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
new file mode 100644
index 0000000..d484ec9
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -0,0 +1,227 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import java.util.concurrent._
+
+import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.duration.Duration
+import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
+import scala.util.control.NonFatal
+
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+
+private[griffin] object ThreadUtils {
+
+ private val sameThreadExecutionContext =
+ ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
+
+ /**
+ * An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`.
+ * The caller should make sure the tasks running in this `ExecutionContextExecutor` are short and
+ * never block.
+ */
+ def sameThread: ExecutionContextExecutor = sameThreadExecutionContext
+
+ /**
+ * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
+ */
+ def namedThreadFactory(prefix: String): ThreadFactory = {
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
+ }
+
+ /**
+ * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
+ */
+ def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+ val threadFactory = namedThreadFactory(prefix)
+ Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
+
+ /**
+ * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
+ * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
+ */
+ def newDaemonCachedThreadPool(
+ prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
+ val threadFactory = namedThreadFactory(prefix)
+ val threadPool = new ThreadPoolExecutor(
+ maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
+ maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
+ keepAliveSeconds,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable],
+ threadFactory)
+ threadPool.allowCoreThreadTimeOut(true)
+ threadPool
+ }
+
+ /**
+ * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
+ */
+ def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+ val threadFactory = namedThreadFactory(prefix)
+ Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+ }
+
+ /**
+ * Wrapper over newSingleThreadExecutor.
+ */
+ def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
+ val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+ Executors.newSingleThreadExecutor(threadFactory)
+ }
+
+ /**
+ * Wrapper over ScheduledThreadPoolExecutor.
+ */
+ def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+ val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+ val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
+ // By default, a cancelled task is not automatically removed from the work queue until its delay
+ // elapses. We have to enable it manually.
+ executor.setRemoveOnCancelPolicy(true)
+ executor
+ }
+
+ /**
+ * Run a piece of code in a new thread and return the result. Exception in the new thread is
+ * thrown in the caller thread with an adjusted stack trace that removes references to this
+ * method for clarity. The exception stack traces will be like the following
+ *
+ * SomeException: exception-message
+ * at CallerClass.body-method (sourcefile.scala)
+ * at ... run in separate thread using org.apache.griffin.measure.utils.ThreadUtils ... ()
+ * at CallerClass.caller-method (sourcefile.scala)
+ * ...
+ */
+ def runInNewThread[T](
+ threadName: String,
+ isDaemon: Boolean = true)(body: => T): T = {
+ @volatile var exception: Option[Throwable] = None
+ @volatile var result: T = null.asInstanceOf[T]
+
+ val thread = new Thread(threadName) {
+ override def run(): Unit = {
+ try {
+ result = body
+ } catch {
+ case NonFatal(e) =>
+ exception = Some(e)
+ }
+ }
+ }
+ thread.setDaemon(isDaemon)
+ thread.start()
+ thread.join()
+
+ exception match {
+ case Some(realException) =>
+ // Remove the part of the stack that shows method calls into this helper method
+ // This means drop everything from the top until the stack element
+ // ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
+ val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
+ ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
+
+ // Remove the part of the new thread stack that shows methods call from this helper method
+ val extraStackTrace = realException.getStackTrace.takeWhile(
+ ! _.getClassName.contains(this.getClass.getSimpleName))
+
+ // Combine the two stack traces, with a place holder just specifying that there
+ // was a helper method used, without any further details of the helper
+ val placeHolderStackElem = new StackTraceElement(
+ s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
+ " ", "", -1)
+ val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
+
+ // Update the stack trace and rethrow the exception in the caller thread
+ realException.setStackTrace(finalStackTrace)
+ throw realException
+ case None =>
+ result
+ }
+ }
+
+ /**
+ * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
+ */
+ def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+ // Custom factory to set thread names
+ val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
+ override def newThread(pool: SForkJoinPool) =
+ new SForkJoinWorkerThread(pool) {
+ setName(prefix + "-" + super.getName)
+ }
+ }
+ new SForkJoinPool(maxThreadNumber, factory,
+ null, // handler
+ false // asyncMode
+ )
+ }
+
+ // scalastyle:off awaitresult
+ /**
+ * Preferred alternative to `Await.result()`.
+ *
+ * This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
+ * that this thread's stack trace appears in logs.
+ *
+ * In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
+ * `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
+ * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
+ * method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
+ * In general, we should use this method because it's hard to debug when [[ThreadLocal]]s leak
+ * to other tasks.
+ */
+ @throws(classOf[Exception])
+ def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.result(atMost)(awaitPermission)
+ } catch {
+ // TimeoutException is thrown in the current thread, so not need to warp the exception.
+ case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+ throw new Exception("Exception thrown in awaitResult: ", t)
+ }
+ }
+ // scalastyle:on awaitresult
+
+ // scalastyle:off awaitready
+ /**
+ * Preferred alternative to `Await.ready()`.
+ *
+ * @see [[awaitResult]]
+ */
+ @throws(classOf[Exception])
+ def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
+ try {
+ // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+ val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+ awaitable.ready(atMost)(awaitPermission)
+ } catch {
+ // TimeoutException is thrown in the current thread, so not need to warp the exception.
+ case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+ throw new Exception("Exception thrown in awaitResult: ", t)
+ }
+ }
+ // scalastyle:on awaitready
+}
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
new file mode 100644
index 0000000..e640c45
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.ContextId
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.step.transform.TransformStep
+
+import org.scalatest._
+
+class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
+
+ case class DualTransformStep(name: String,
+ duration: Int,
+ rule: String = "",
+ details: Map[String, Any] = Map(),
+ cache: Boolean = false
+ ) extends TransformStep {
+
+ def doExecute(context: DQContext): Boolean = {
+ val threadName = Thread.currentThread().getName
+ info(s"Step $name started with $threadName")
+ Thread.sleep(duration * 1000L)
+ info(s"Step $name finished with $threadName")
+ true
+ }
+ }
+
+ private def getDqContext(name: String = "test-context"): DQContext = {
+ DQContext(
+ ContextId(System.currentTimeMillis),
+ name,
+ Nil,
+ Nil,
+ BatchProcessType
+ )(spark)
+ }
+
+ /**
+ * Run transform steps in parallel. Here are the dependencies of transform steps
+ *
+ * step5
+ * | |---step2
+ * | | |---step1
+ * | |---step3
+ * | | |---step1
+ * | |---step4
+ *
+ * step1 : -->
+ * step2 : --->
+ * step3 : ---->
+ * step4 : ->
+ * step5 : -->
+ *
+ */
+ "transform step " should "be run steps in parallel" in {
+ val step1 = DualTransformStep("step1", 3)
+ val step2 = DualTransformStep("step2", 4)
+ step2.parentSteps += step1
+ val step3 = DualTransformStep("step3", 5)
+ step3.parentSteps += step1
+ val step4 = DualTransformStep("step4", 2)
+ val step5 = DualTransformStep("step5", 3)
+ step5.parentSteps += step2
+ step5.parentSteps += step3
+ step5.parentSteps += step4
+
+ val context = getDqContext()
+ step5.execute(context) should be (true)
+ }
+}