You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/07/08 13:20:14 UTC

[griffin] branch master updated: build and run transform steps with multiple threads

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 62d755d  build and run transform steps with multiple threads
62d755d is described below

commit 62d755d6f0474d4492d598d469e02c891d99bf95
Author: wankunde <wa...@163.com>
AuthorDate: Mon Jul 8 21:19:55 2019 +0800

    build and run transform steps with multiple threads
    
    There are sequence of dq steps in each DQJob,and run those steps one by one (with foldLeft function).
    
    We can use multiple threads to run some of those steps which have no dependency.
    
    For example:
    
    In a DQBatchJob, a accuracyExpr will have for steps **_missRecords ,_missCount , __totalCount,   accu** .
    
    _missCount and **_totalCount step can run at the same time .
    
    In SeqDQStep ,it just need contains some root steps without dependency steps.
    
    If each step knows it's dependency steps, and when they are ready, we can run the step itself .
    
    Running step :
    accu
    | |---__missCount
    | | |---__missRecords
    | |---__totalCount
    
    Running step :
    __missCount
    | |---__missRecords
    
    Running step :
    __missRecords
    
    Running step :
    __totalCount
    
    Author: wankunde <wa...@163.com>
    
    Closes #504 from wankunde/master.
---
 .../org/apache/griffin/measure/step/DQStep.scala   |   9 +
 .../dsl/transform/AccuracyExpr2DQSteps.scala       |  22 +-
 .../dsl/transform/CompletenessExpr2DQSteps.scala   |  15 +-
 .../dsl/transform/DistinctnessExpr2DQSteps.scala   |  28 ++-
 .../dsl/transform/TimelinessExpr2DQSteps.scala     |   7 +-
 .../dsl/transform/UniquenessExpr2DQSteps.scala     |  13 +-
 .../step/transform/DataFrameOpsTransformStep.scala |   2 +-
 .../step/transform/SparkSqlTransformStep.scala     |   2 +-
 .../measure/step/transform/TransformStep.scala     |  79 +++++++
 .../apache/griffin/measure/utils/ThreadUtils.scala | 227 +++++++++++++++++++++
 .../griffin/measure/step/TransformStepTest.scala   |  91 +++++++++
 11 files changed, 457 insertions(+), 38 deletions(-)

diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
index 60c8477..6a50ebb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -33,3 +33,12 @@ trait DQStep extends Loggable {
   def getNames(): Seq[String] = name :: Nil
 
 }
+
+object DQStepStatus extends Enumeration {
+  val PENDING = Value
+  val RUNNING = Value
+  val COMPLETE = Value
+  val FAILED = Value
+}
+
+
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index f7ff3ef..31eef69 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -111,6 +111,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
             s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
+      missCountTransStep.parentSteps += missRecordsTransStep
 
       // 3. total count
       val totalCountTableName = "__totalCount"
@@ -151,6 +152,8 @@ case class AccuracyExpr2DQSteps(context: DQContext,
          """.stripMargin
       }
       val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
+      accuracyTransStep.parentSteps += missCountTransStep
+      accuracyTransStep.parentSteps += totalCountTransStep
       val accuracyMetricWriteSteps = procType match {
         case BatchProcessType =>
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
@@ -160,14 +163,12 @@ case class AccuracyExpr2DQSteps(context: DQContext,
         case StreamingProcessType => Nil
       }
 
-      // accuracy current steps
-      val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil
-      val writeSteps1 =
+      val batchWriteSteps =
         accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps
 
-      // streaming extra steps
-      val (transSteps2, writeSteps2) = procType match {
-        case BatchProcessType => (Nil, Nil)
+      procType match {
+        case BatchProcessType => accuracyTransStep :: batchWriteSteps
+        // streaming extra steps
         case StreamingProcessType =>
           // 5. accuracy metric merge
           val accuracyMetricTableName = "__accuracy"
@@ -179,6 +180,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           )
           val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
             accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
+          accuracyMetricTransStep.parentSteps += accuracyTransStep
           val accuracyMetricWriteStep = {
             val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
             val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
@@ -196,6 +198,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           }
           val accuracyRecordTransStep = SparkSqlTransformStep(
             accuracyRecordTableName, accuracyRecordSql, emptyMap)
+          accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
           val accuracyRecordWriteStep = {
             val rwName =
               ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -205,12 +208,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           }
 
           // extra steps
-          (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
-            accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
+          val streamingWriteSteps = accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil
+          accuracyRecordTransStep :: batchWriteSteps ++ streamingWriteSteps
       }
-
-      // full steps
-      transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
     }
   }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 87cfa86..3df4a12 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -93,6 +93,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
 
       val incompleteRecordTransStep =
         SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
+      incompleteRecordTransStep.parentSteps += sourceAliasTransStep
       val incompleteRecordWriteStep = {
         val rwName =
           ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
@@ -112,6 +113,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       }
       val incompleteCountTransStep =
         SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+      incompleteCountTransStep.parentSteps += incompleteRecordTransStep
 
       // 4. total count
       val totalCountTableName = "__totalCount"
@@ -124,6 +126,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
             s"FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
+      totalCountTransStep.parentSteps += sourceAliasTransStep
 
       // 5. complete metric
       val completeTableName = ruleParam.getOutDfName()
@@ -147,6 +150,8 @@ case class CompletenessExpr2DQSteps(context: DQContext,
          """.stripMargin
       }
       val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
+      completeTransStep.parentSteps += incompleteCountTransStep
+      completeTransStep.parentSteps += totalCountTransStep
       val completeWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
@@ -154,14 +159,8 @@ case class CompletenessExpr2DQSteps(context: DQContext,
         MetricWriteStep(mwName, completeTableName, flattenType)
       }
 
-      val transSteps = {
-        sourceAliasTransStep :: incompleteRecordTransStep ::
-          incompleteCountTransStep :: totalCountTransStep ::
-          completeTransStep :: Nil
-      }
-      val writeSteps = {
-        incompleteRecordWriteStep :: completeWriteStep :: Nil
-      }
+      val transSteps = completeTransStep :: Nil
+      val writeSteps = incompleteRecordWriteStep :: completeWriteStep :: Nil
 
       // full steps
       transSteps ++ writeSteps
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 70fee6c..0e2b10e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -111,6 +111,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
         s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
       }
       val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
+      totalTransStep.parentSteps += sourceAliasTransStep
       val totalMetricWriteStep = {
         MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
       }
@@ -128,8 +129,9 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
       }
       val selfGroupTransStep =
         SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
+      selfGroupTransStep.parentSteps += sourceAliasTransStep
 
-      val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil
+      val transSteps1 = totalTransStep :: selfGroupTransStep :: Nil
       val writeSteps1 = totalMetricWriteStep :: Nil
 
       val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
@@ -163,6 +165,8 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
             """.stripMargin
           }
           val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
+          joinedTransStep.parentSteps += selfGroupTransStep
+          joinedTransStep.parentSteps += olderAliasTransStep
 
           // 6. group by joined data
           val groupTableName = "__group"
@@ -176,6 +180,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
              """.stripMargin
           }
           val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap)
+          groupTransStep.parentSteps += joinedTransStep
 
           // 7. final duplicate count
           val finalDupCountTableName = "__finalDupCount"
@@ -204,12 +209,13 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val finalDupCountTransStep =
             SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+          finalDupCountTransStep.parentSteps += groupTransStep
 
-          ((olderAliasTransStep :: joinedTransStep
-            :: groupTransStep :: finalDupCountTransStep :: Nil,
-            targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
+          ((finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil),
+            finalDupCountTableName)
         case _ =>
-          ((Nil, Nil), selfGroupTableName)
+          ((selfGroupTransStep :: Nil, totalMetricWriteStep :: Nil),
+            selfGroupTableName)
       }
 
       // 8. distinct metric
@@ -262,6 +268,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                """.stripMargin
           }
           val rnTransStep = SparkSqlTransformStep(rnTableName, rnSql, emptyMap)
+          rnTransStep.parentSteps += informedTransStep
 
           // 11. recognize duplicate items
           val dupItemsTableName = "__dupItems"
@@ -272,6 +279,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                """.stripMargin
           }
           val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
+          dupItemsTransStep.parentSteps += rnTransStep
           val dupItemsWriteStep = {
             val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
             RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
@@ -289,6 +297,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val groupDupMetricTransStep =
             SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+          groupDupMetricTransStep.parentSteps += dupItemsTransStep
           val groupDupMetricWriteStep = {
             MetricWriteStep(duplicationArrayName,
               groupDupMetricTableName,
@@ -296,9 +305,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
               writeTimestampOpt)
           }
 
-          val msteps = {
-            informedTransStep :: rnTransStep :: dupItemsTransStep :: groupDupMetricTransStep :: Nil
-          }
+          val msteps = groupDupMetricTransStep :: Nil
           val wsteps = if (recordEnable) {
             dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
           } else {
@@ -344,6 +351,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
               """.stripMargin
           }
           val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
+          dupMetricTransStep.parentSteps += dupRecordTransStep
           val dupMetricWriteStep = {
             MetricWriteStep(
               duplicationArrayName,
@@ -353,9 +361,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
             )
           }
 
-          val msteps = {
-            dupRecordTransStep :: dupMetricTransStep :: Nil
-          }
+          val msteps = dupMetricTransStep :: Nil
           val wsteps = if (recordEnable) {
             dupRecordWriteStep :: dupMetricWriteStep :: Nil
           } else {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 71eb452..03c2c8d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -105,6 +105,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           s"FROM `${inTimeTableName}`"
       }
       val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
+      latencyTransStep.parentSteps += inTimeTransStep
 
       // 3. timeliness metric
       val metricTableName = ruleParam.getOutDfName()
@@ -129,6 +130,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
            """.stripMargin
       }
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
+      metricTransStep.parentSteps += latencyTransStep
       val metricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
@@ -137,7 +139,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       }
 
       // current steps
-      val transSteps1 = inTimeTransStep :: latencyTransStep :: metricTransStep :: Nil
+      val transSteps1 = metricTransStep :: Nil
       val writeSteps1 = metricWriteStep :: Nil
 
       // 4. timeliness record
@@ -190,11 +192,12 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           }
           val rangeMetricTransStep =
             SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+          rangeMetricTransStep.parentSteps += rangeTransStep
           val rangeMetricWriteStep = {
             MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
           }
 
-          (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
+          (rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
         case _ => (Nil, Nil)
       }
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 28e9d48..7f259ea 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -64,7 +64,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       warn(s"[${timestamp}] data source ${sourceName} not exists")
       Nil
     } else if (!context.runTimeTableRegister.existsTable(targetName)) {
-      println(s"[${timestamp}] data source ${targetName} not exists")
+      warn(s"[${timestamp}] data source ${targetName} not exists")
       Nil
     } else {
       val selItemsClause = analyzer.selectionPairs.map { pair =>
@@ -104,6 +104,8 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}"
       }
       val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap)
+      joinedTransStep.parentSteps += sourceTransStep
+      joinedTransStep.parentSteps += targetTransStep
 
       // 4. group
       val groupTableName = "__group"
@@ -116,6 +118,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
           s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
       }
       val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true)
+      groupTransStep.parentSteps += joinedTransStep
 
       // 5. total metric
       val totalTableName = "__totalMetric"
@@ -138,6 +141,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       }
       val uniqueRecordTransStep =
         SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
+      uniqueRecordTransStep.parentSteps += groupTransStep
 
       // 7. unique metric
       val uniqueTableName = "__uniqueMetric"
@@ -152,12 +156,12 @@ case class UniquenessExpr2DQSteps(context: DQContext,
            """.stripMargin
       }
       val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
+      uniqueTransStep.parentSteps += uniqueRecordTransStep
 
       val uniqueMetricWriteStep =
         MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
 
-      val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
-        totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
+      val transSteps1 = totalTransStep :: uniqueTransStep :: Nil
       val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
 
       val duplicationArrayName = details.getString(_duplicationArray, "")
@@ -198,11 +202,12 @@ case class UniquenessExpr2DQSteps(context: DQContext,
           """.stripMargin
         }
         val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
+        dupMetricTransStep.parentSteps += dupRecordTransStep
         val dupMetricWriteStep = {
           MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
         }
 
-        (dupRecordTransStep :: dupMetricTransStep :: Nil,
+        (dupMetricTransStep :: Nil,
           dupRecordWriteStep :: dupMetricWriteStep :: Nil)
       } else (Nil, Nil)
 
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 4ac35b2..b07595a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -30,7 +30,7 @@ case class DataFrameOpsTransformStep(name: String,
                                      cache: Boolean = false
                                     ) extends TransformStep {
 
-  def execute(context: DQContext): Boolean = {
+  def doExecute(context: DQContext): Boolean = {
     val sqlContext = context.sqlContext
     try {
       val df = rule match {
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index 39b6a0e..59ea822 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -29,7 +29,7 @@ case class SparkSqlTransformStep(name: String,
                                  cache: Boolean = false
                                 ) extends TransformStep {
 
-  def execute(context: DQContext): Boolean = {
+  def doExecute(context: DQContext): Boolean = {
     val sqlContext = context.sqlContext
     try {
       val df = sqlContext.sql(rule)
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
index 995ce49..8c094b2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -18,7 +18,15 @@ under the License.
 */
 package org.apache.griffin.measure.step.transform
 
+import scala.collection.mutable.HashSet
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.DQStepStatus._
+import org.apache.griffin.measure.utils.ThreadUtils
 
 trait TransformStep extends DQStep {
 
@@ -28,4 +36,75 @@ trait TransformStep extends DQStep {
 
   val cache: Boolean
 
+  var status = PENDING
+
+  val parentSteps = new HashSet[TransformStep]
+
+  def doExecute(context: DQContext): Boolean
+
+  def execute(context: DQContext): Boolean = {
+    val threadName = Thread.currentThread().getName
+    info(threadName + " bigin transform step : \n" + debugString())
+    // Submit parents Steps
+    val parentStepFutures = parentSteps.filter(checkAndUpdateStatus).map { parentStep =>
+      Future {
+        val result = parentStep.execute(context)
+        parentStep.synchronized {
+          if (result) {
+            parentStep.status = COMPLETE
+          } else {
+            parentStep.status = FAILED
+          }
+        }
+      }(TransformStep.transformStepContext)
+    }
+    ThreadUtils.awaitResult(
+      Future.sequence(parentStepFutures)(implicitly, TransformStep.transformStepContext),
+      Duration.Inf)
+
+    parentSteps.map(step => {
+      while (step.status == RUNNING) {
+        Thread.sleep(1000L)
+      }
+    })
+    val prepared = parentSteps.foldLeft(true)((ret, step) => ret && step.status == COMPLETE)
+    if (prepared) {
+      val res = doExecute(context)
+      info(threadName + " end transform step : \n" + debugString())
+      res
+    } else {
+      error("Parent transform step failed!")
+      false
+    }
+  }
+
+  def checkAndUpdateStatus(step: TransformStep): Boolean = {
+    step.synchronized {
+      if (step.status == PENDING) {
+        step.status = RUNNING
+        true
+      } else {
+        false
+      }
+    }
+  }
+
+  def debugString(level: Int = 0): String = {
+    val stringBuffer = new StringBuilder
+    if (level > 0) {
+      for (i <- 0 to level - 1) {
+        stringBuffer.append("|   ")
+      }
+      stringBuffer.append("|---")
+    }
+    stringBuffer.append(name + "\n")
+    parentSteps.foreach(parentStep => stringBuffer.append(parentStep.debugString(level + 1)))
+    stringBuffer.toString()
+  }
 }
+
+object TransformStep {
+  private[transform] val transformStepContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("transform-step"))
+}
+
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
new file mode 100644
index 0000000..d484ec9
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ThreadUtils.scala
@@ -0,0 +1,227 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import java.util.concurrent._
+
+import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.duration.Duration
+import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
+import scala.util.control.NonFatal
+
+import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
+
+private[griffin] object ThreadUtils {
+
+  private val sameThreadExecutionContext =
+    ExecutionContext.fromExecutorService(MoreExecutors.sameThreadExecutor())
+
+  /**
+   * An `ExecutionContextExecutor` that runs each task in the thread that invokes `execute/submit`.
+   * The caller should make sure the tasks running in this `ExecutionContextExecutor` are short and
+   * never block.
+   */
+  def sameThread: ExecutionContextExecutor = sameThreadExecutionContext
+
+  /**
+   * Create a thread factory that names threads with a prefix and also sets the threads to daemon.
+   */
+  def namedThreadFactory(prefix: String): ThreadFactory = {
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
+  }
+
+  /**
+   * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
+   * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
+   */
+  def newDaemonCachedThreadPool(
+      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    val threadPool = new ThreadPoolExecutor(
+      maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
+      maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
+      keepAliveSeconds,
+      TimeUnit.SECONDS,
+      new LinkedBlockingQueue[Runnable],
+      threadFactory)
+    threadPool.allowCoreThreadTimeOut(true)
+    threadPool
+  }
+
+  /**
+   * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
+    val threadFactory = namedThreadFactory(prefix)
+    Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
+  }
+
+  /**
+   * Wrapper over newSingleThreadExecutor.
+   */
+  def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    Executors.newSingleThreadExecutor(threadFactory)
+  }
+
+  /**
+   * Wrapper over ScheduledThreadPoolExecutor.
+   */
+  def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
+    val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
+    val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
+    // By default, a cancelled task is not automatically removed from the work queue until its delay
+    // elapses. We have to enable it manually.
+    executor.setRemoveOnCancelPolicy(true)
+    executor
+  }
+
+  /**
+   * Run a piece of code in a new thread and return the result. Exception in the new thread is
+   * thrown in the caller thread with an adjusted stack trace that removes references to this
+   * method for clarity. The exception stack traces will be like the following
+   *
+   * SomeException: exception-message
+   *   at CallerClass.body-method (sourcefile.scala)
+   *   at ... run in separate thread using org.apache.griffin.measure.utils.ThreadUtils ... ()
+   *   at CallerClass.caller-method (sourcefile.scala)
+   *   ...
+   */
+  def runInNewThread[T](
+      threadName: String,
+      isDaemon: Boolean = true)(body: => T): T = {
+    @volatile var exception: Option[Throwable] = None
+    @volatile var result: T = null.asInstanceOf[T]
+
+    val thread = new Thread(threadName) {
+      override def run(): Unit = {
+        try {
+          result = body
+        } catch {
+          case NonFatal(e) =>
+            exception = Some(e)
+        }
+      }
+    }
+    thread.setDaemon(isDaemon)
+    thread.start()
+    thread.join()
+
+    exception match {
+      case Some(realException) =>
+        // Remove the part of the stack that shows method calls into this helper method
+        // This means drop everything from the top until the stack element
+        // ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
+        val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)
+
+        // Remove the part of the new thread stack that shows methods call from this helper method
+        val extraStackTrace = realException.getStackTrace.takeWhile(
+          ! _.getClassName.contains(this.getClass.getSimpleName))
+
+        // Combine the two stack traces, with a place holder just specifying that there
+        // was a helper method used, without any further details of the helper
+        val placeHolderStackElem = new StackTraceElement(
+          s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
+          " ", "", -1)
+        val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace
+
+        // Update the stack trace and rethrow the exception in the caller thread
+        realException.setStackTrace(finalStackTrace)
+        throw realException
+      case None =>
+        result
+    }
+  }
+
+  /**
+   * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
+   */
+  def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+    // Custom factory to set thread names
+    val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
+      override def newThread(pool: SForkJoinPool) =
+        new SForkJoinWorkerThread(pool) {
+          setName(prefix + "-" + super.getName)
+        }
+    }
+    new SForkJoinPool(maxThreadNumber, factory,
+      null, // handler
+      false // asyncMode
+    )
+  }
+
+  // scalastyle:off awaitresult
+  /**
+   * Preferred alternative to `Await.result()`.
+   *
+   * This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
+   * that this thread's stack trace appears in logs.
+   *
+   * In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
+   * `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
+   * As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
+   * method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
+   * In general, we should use this method because it's hard to debug when [[ThreadLocal]]s leak
+   * to other tasks.
+   */
+  @throws(classOf[Exception])
+  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.result(atMost)(awaitPermission)
+    } catch {
+      // TimeoutException is thrown in the current thread, so not need to warp the exception.
+      case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+        throw new Exception("Exception thrown in awaitResult: ", t)
+    }
+  }
+  // scalastyle:on awaitresult
+
+  // scalastyle:off awaitready
+  /**
+   * Preferred alternative to `Await.ready()`.
+   *
+   * @see [[awaitResult]]
+   */
+  @throws(classOf[Exception])
+  def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
+    try {
+      // `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
+      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
+      awaitable.ready(atMost)(awaitPermission)
+    } catch {
+      // TimeoutException is thrown in the current thread, so not need to warp the exception.
+      case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
+        throw new Exception("Exception thrown in awaitResult: ", t)
+    }
+  }
+  // scalastyle:on awaitready
+}
\ No newline at end of file
diff --git a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
new file mode 100644
index 0000000..e640c45
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import org.apache.griffin.measure.context.ContextId
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.step.transform.TransformStep
+
+import org.scalatest._
+
+class TransformStepTest extends FlatSpec with Matchers with DataFrameSuiteBase with Loggable {
+
+  case class DualTransformStep(name: String,
+                               duration: Int,
+                               rule: String = "",
+                               details: Map[String, Any] = Map(),
+                               cache: Boolean = false
+                              ) extends TransformStep {
+
+    def doExecute(context: DQContext): Boolean = {
+      val threadName = Thread.currentThread().getName
+      info(s"Step $name started with $threadName")
+      Thread.sleep(duration * 1000L)
+      info(s"Step $name finished with $threadName")
+      true
+    }
+  }
+
+  private def getDqContext(name: String = "test-context"): DQContext = {
+    DQContext(
+      ContextId(System.currentTimeMillis),
+      name,
+      Nil,
+      Nil,
+      BatchProcessType
+    )(spark)
+  }
+
+  /**
+    * Run transform steps in parallel. Here are the dependencies of transform steps
+    *
+    * step5
+    * |   |---step2
+    * |   |   |---step1
+    * |   |---step3
+    * |   |   |---step1
+    * |   |---step4
+    *
+    * step1 : -->
+    * step2 :    --->
+    * step3 :    ---->
+    * step4 : ->
+    * step5 :         -->
+    *
+    */
+  "transform step " should "be run steps in parallel" in {
+    val step1 = DualTransformStep("step1", 3)
+    val step2 = DualTransformStep("step2", 4)
+    step2.parentSteps += step1
+    val step3 = DualTransformStep("step3", 5)
+    step3.parentSteps += step1
+    val step4 = DualTransformStep("step4", 2)
+    val step5 = DualTransformStep("step5", 3)
+    step5.parentSteps += step2
+    step5.parentSteps += step3
+    step5.parentSteps += step4
+
+    val context = getDqContext()
+    step5.execute(context) should be (true)
+  }
+}