Posted to by on 2017/11/19 02:28:53 UTC

[10/23] incubator-s2graph git commit: add TraversalHelper.

add TraversalHelper.


Branch: refs/heads/master
Commit: 7d082255944228a34e8ea55d9609202ab65362b2
Parents: aa66822
Author: DO YUNG YOON <>
Authored: Sat Nov 4 07:19:27 2017 +0900
Committer: DO YUNG YOON <>
Committed: Sat Nov 4 07:19:27 2017 +0900

 .../org/apache/s2graph/core/PostProcess.scala   | 380 ----------------
 .../scala/org/apache/s2graph/core/S2Graph.scala |  58 +--
 .../apache/s2graph/core/TraversalHelper.scala   | 442 +++++++++++++++++++
 .../apache/s2graph/core/storage/StorageIO.scala |   2 +-
 4 files changed, 450 insertions(+), 432 deletions(-)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 4549d84..2d2e183 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -281,384 +281,4 @@ object PostProcess {
       withOptionalFields(queryOption, results.size, degrees, results, stepResult.failCount, cursorJson, nextQuery)
-  /** Global helper functions */
-  @tailrec
-  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
-    if (range < sampleNumber || set.size == sampleNumber) set
-    else randomInt(sampleNumber, range, set + Random.nextInt(range))
-  }
-  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
-    if (edges.size <= n) {
-      edges
-    } else {
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
-        edges.tail
-      } else edges
-      val randoms = randomInt(n, plainEdges.size)
-      var samples = List.empty[EdgeWithScore]
-      var idx = 0
-      plainEdges.foreach { e =>
-        if (randoms.contains(idx)) samples = e :: samples
-        idx += 1
-      }
-      samples
-    }
-  }
-  def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
-    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
- { edgeWithScore =>
-      edgeWithScore.copy(score = edgeWithScore.score / sum)
-    }
-  }
-  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = {
-    val vertices = for {
-      edgeWithScore <- edgeWithScoreLs
-      edge = edgeWithScore.edge
-      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
-    } yield (edge.labelWithDir, vertex) -> true
-    vertices.toMap
-  }
-  /** common methods for filter out, transform, aggregate queryResult */
-  def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
-    for {
-      convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
-    } yield convertedEdge
-  }
-  def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = {
-    /* process time decay */
-    val tsVal = queryParam.timeDecay match {
-      case None => 1.0
-      case Some(timeDecay) =>
-        val tsVal = try {
-          val innerValWithTsOpt = edge.propertyValue(
- { innerValWithTs =>
-            val innerVal = innerValWithTs.innerVal
-            timeDecay.labelMeta.dataType match {
-              case InnerVal.LONG => innerVal.value match {
-                case n: BigDecimal => n.bigDecimal.longValue()
-                case _ => innerVal.toString().toLong
-              }
-              case _ => innerVal.toString().toLong
-            }
-          } getOrElse (edge.ts)
-        } catch {
-          case e: Exception =>
-            logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
-            edge.ts
-        }
-        val timeDiff = queryParam.timestamp - tsVal
-        timeDecay.decay(timeDiff)
-    }
-    tsVal
-  }
-  def processDuplicates[R](queryParam: QueryParam,
-                           duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
-    if (queryParam.label.consistencyLevel != "strong") {
-      //TODO:
-      queryParam.duplicatePolicy match {
-        case DuplicatePolicy.First => Seq(duplicates.head)
-        case DuplicatePolicy.Raw => duplicates
-        case DuplicatePolicy.CountSum =>
-          val countSum = duplicates.size
-          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
-          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
-        case _ =>
-          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
-          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
-          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
-      }
-    } else {
-      duplicates
-    }
-  }
-  def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = {
-    val src = edge.srcVertex.innerId.hashCode()
-    val tgt = edge.tgtVertex.innerId.hashCode()
-    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
-    val filterHashKey = (src, tgt)
-    (hashKey, filterHashKey)
-  }
-  def filterEdges(q: Query,
-                  stepIdx: Int,
-                  queryRequests: Seq[QueryRequest],
-                  queryResultLsFuture: Future[Seq[StepResult]],
-                  queryParams: Seq[QueryParam],
-                  alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty,
-                  buildLastStepInnerResult: Boolean = true,
-                  parentEdges: Map[VertexId, Seq[EdgeWithScore]])
-                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
- { queryRequestWithResultLs =>
-      val (cursors, failCount) = {
-        val _cursors = ArrayBuffer.empty[Array[Byte]]
-        var _failCount = 0
-        queryRequestWithResultLs.foreach { stepResult =>
-          _cursors.append(stepResult.cursors: _*)
-          _failCount += stepResult.failCount
-        }
-        _cursors -> _failCount
-      }
-      if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
-      else {
-        val isLastStep = stepIdx == q.steps.size - 1
-        val queryOption = q.queryOption
-        val step = q.steps(stepIdx)
-        val currentStepResults =
-        val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
-        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
-        if (shouldBuildInnerResults) {
-          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
-            edgeWithScore
-          }
-          /* process step group by */
-          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
-          StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
-        } else {
-          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
-            val edge = edgeWithScore.edge
-            val score = edgeWithScore.score
-            val label = edgeWithScore.label
-            /* Select */
-            val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
-            //            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
-            val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
-            val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
-            /* OrderBy */
-            val orderByValues =
-              if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
-              else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
-            /* StepGroupBy */
-            val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
-            /* GroupBy */
-            val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
-            /* FilterOut */
-            val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
-            newEdgeWithScore.copy(orderByValues = orderByValues,
-              stepGroupByValues = stepGroupByValues,
-              groupByValues = groupByValues,
-              filterOutValues = filterOutValues)
-          }
-          /* process step group by */
-          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
-          /* process ordered list */
-          val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
-          /* process grouped list */
-          val grouped =
-            if (queryOption.groupBy.keys.isEmpty) Nil
-            else {
-              val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
-              results.groupBy { edgeWithScore =>
-                //      
-                edgeWithScore.groupByValues
-              }.foreach { case (k, ls) =>
-                val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
-                val newScoreSum = scoreSum
-                /*
-                  * watch out here. by calling toString on Any, we lose type information which will be used
-                  * later for toJson.
-                  */
-                if (merged.nonEmpty) {
-                  val newKey = merged.head.groupByValues
-                  agg += ((newKey, (newScoreSum, merged)))
-                }
-              }
-              agg.toSeq.sortBy(_._2._1 * -1)
-            }
-          StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
-        }
-      }
-    }
-  }
-  def toEdgeWithScores(queryRequest: QueryRequest,
-                               stepResult: StepResult,
-                               parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
-    val queryOption = queryRequest.query.queryOption
-    val queryParam = queryRequest.queryParam
-    val prevScore = queryRequest.prevStepScore
-    val labelWeight = queryRequest.labelWeight
-    val edgeWithScores = stepResult.edgeWithScores
-    val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
-    val parents = if (shouldBuildParents) {
-      parentEdges.getOrElse(, Nil).map { edgeWithScore =>
-        val edge = edgeWithScore.edge
-        val score = edgeWithScore.score
-        val label = edgeWithScore.label
-        /* Select */
-        val mergedPropsWithTs =
-          if (queryOption.selectColumns.isEmpty) {
-            edge.propertyValuesInner()
-          } else {
-            val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
-            edge.propertyValues(queryOption.selectColumns) ++ initial
-          }
-        val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
-        edgeWithScore.copy(edge = newEdge)
-      }
-    } else Nil
-    // skip
-    if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
-    else {
-      val degreeScore = 0.0
-      val sampled =
-        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
-        else edgeWithScores
-      val withScores = for {
-        edgeWithScore <- sampled
-      } yield {
-        val edge = edgeWithScore.edge
-        val edgeScore = edgeWithScore.score
-        val score = queryParam.scorePropagateOp match {
-          case "plus" => edgeScore + prevScore
-          case "divide" =>
-            if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
-            else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
-          case _ => edgeScore * prevScore
-        }
-        val tsVal = processTimeDecay(queryParam, edge)
-        val newScore = degreeScore + score
-        //          val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
-        val newEdge = edge.copyParentEdges(parents)
-        edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
-      }
-      val normalized =
-        if (queryParam.shouldNormalize) normalize(withScores)
-        else withScores
-      normalized
-    }
-  }
-  def buildResult[R](query: Query,
-                             stepIdx: Int,
-                             stepResultLs: Seq[(QueryRequest, StepResult)],
-                             parentEdges: Map[VertexId, Seq[EdgeWithScore]])
-                            (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
-                            (implicit ev: WithScore[R]): ListBuffer[R] = {
-    import scala.collection._
-    val results = ListBuffer.empty[R]
-    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
-    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
-    val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
-    val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
-    var numOfDuplicates = 0
-    val queryOption = query.queryOption
-    val step = query.steps(stepIdx)
-    val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
-    val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
-    stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
-      val queryParam = queryRequest.queryParam
-      val label = queryParam.label
-      val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
-      val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
-      val propsSelectColumns = (for {
-        column <- queryOption.propsSelectColumns
-        labelMeta <- label.metaPropsInvMap.get(column)
-      } yield labelMeta)
-      for {
-        edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
-      } {
-        val edge = edgeWithScore.edge
-        val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
-        //        params += (hashKey -> queryParam) //
-        /* check if this edge should be exlcuded. */
-        if (shouldBeExcluded) {
-          edgesToExclude.add(filterHashKey)
-        } else {
-          if (shouldBeIncluded) {
-            edgesToInclude.add(filterHashKey)
-          }
-          val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
-          sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
-          duplicates.get(hashKey) match {
-            case None =>
-              val newLs = ListBuffer.empty[(FilterHashKey, R)]
-              newLs += (filterHashKey -> newEdgeWithScore)
-              duplicates += (hashKey -> newLs) //
-            case Some(old) =>
-              numOfDuplicates += 1
-              old += (filterHashKey -> newEdgeWithScore) //
-          }
-        }
-      }
-    }
-    if (numOfDuplicates == 0) {
-      // no duplicates at all.
-      for {
-        (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
-        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-      } {
-        results += edgeWithScore
-      }
-    } else {
-      // need to resolve duplicates.
-      val seen = new mutable.HashSet[HashKey]()
-      for {
-        (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
-        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
-        if !seen.contains(hashKey)
-      } {
-        //        val queryParam = params(hashKey)
-        processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
-          if (ev.score(duplicate) >= queryParam.threshold) {
-            seen += hashKey
-            results += duplicate
-          }
-        }
-      }
-    }
-    results
-  }
\ No newline at end of file
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 5e23f9b..f061160 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -25,25 +25,15 @@ import java.util.concurrent.{Executors, TimeUnit}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
-import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.features.S2GraphVariables
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.mysqls._
-import{MutateResponse, SKeyValue, Storage}
+import{MutateResponse, Storage}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.PostProcess._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
-import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
-import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
-import{GraphReader, GraphWriter, Io, Mapper}
-import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
-import play.api.libs.json.{JsObject, Json}
+import org.apache.tinkerpop.gremlin.structure.{Edge, Graph}
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -577,6 +567,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
   val elementBuilder = new GraphElementBuilder(this)
+  val traversalHelper = new TraversalHelper(this)
   def getStorage(service: Service): Storage = {
     storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
@@ -671,48 +663,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
                 buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
     if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
     else {
-      val edgeWithScoreLs = stepInnerResult.edgeWithScores
-      val q = orgQuery
-      val queryOption = orgQuery.queryOption
-      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
-      val prevStepThreshold =
-      val prevStepLimit =
-      val step = q.steps(stepIdx)
-     val alreadyVisited =
-        if (stepIdx == 0) Map.empty[(LabelWithDirection, S2VertexLike), Boolean]
-        else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
-      val initial = (Map.empty[S2VertexLike, Double], Map.empty[S2VertexLike, ArrayBuffer[EdgeWithScore]])
-      val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
-        val key = edgeWithScore.edge.tgtVertex
-        val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
-        val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
-        buffer += edgeWithScore
-        (sum + (key -> newScore), group + (key -> buffer))
-      }
-      val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
-      val prevStepTgtVertexIdEdges = => -> t._2)
-      val nextStepSrcVertices = if (prevStepLimit >= 0) {
-        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
-      } else {
-        groupedByFiltered.toSeq
-      }
-      val queryRequests = for {
-        (vertex, prevStepScore) <- nextStepSrcVertices
-        queryParam <- step.queryParams
-      } yield {
-        val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
-        val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
-        QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
-      }
+      val (alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean], prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) =
+        traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult)
       val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
-      filterEdges(orgQuery, stepIdx, queryRequests,
+      traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests,
         fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
new file mode 100644
index 0000000..58da145
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -0,0 +1,442 @@
+package org.apache.s2graph.core
+import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection, VertexId}
+import org.apache.s2graph.core.utils.logger
+import scala.annotation.tailrec
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.concurrent.Future
+import scala.util.Random
+object TraversalHelper {
+  @tailrec
+  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = Set.empty[Int]): Set[Int] = {
+    if (range < sampleNumber || set.size == sampleNumber) set
+    else randomInt(sampleNumber, range, set + Random.nextInt(range))
+  }
+  def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], n: Int): Seq[EdgeWithScore] = {
+    if (edges.size <= n) {
+      edges
+    } else {
+      val plainEdges = if (queryRequest.queryParam.offset == 0) {
+        edges.tail
+      } else edges
+      val randoms = randomInt(n, plainEdges.size)
+      var samples = List.empty[EdgeWithScore]
+      var idx = 0
+      plainEdges.foreach { e =>
+        if (randoms.contains(idx)) samples = e :: samples
+        idx += 1
+      }
+      samples
+    }
+  }
+  def normalize(edgeWithScores: Seq[EdgeWithScore]): Seq[EdgeWithScore] = {
+    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + cur.score }
+ { edgeWithScore =>
+      edgeWithScore.copy(score = edgeWithScore.score / sum)
+    }
+  }
+  def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = {
+    val vertices = for {
+      edgeWithScore <- edgeWithScoreLs
+      edge = edgeWithScore.edge
+      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
+    } yield (edge.labelWithDir, vertex) -> true
+    vertices.toMap
+  }
+  /** common methods for filter out, transform, aggregate queryResult */
+  def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
+    for {
+      convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
+    } yield convertedEdge
+  }
+  def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = {
+    /* process time decay */
+    val tsVal = queryParam.timeDecay match {
+      case None => 1.0
+      case Some(timeDecay) =>
+        val tsVal = try {
+          val innerValWithTsOpt = edge.propertyValue(
+ { innerValWithTs =>
+            val innerVal = innerValWithTs.innerVal
+            timeDecay.labelMeta.dataType match {
+              case InnerVal.LONG => innerVal.value match {
+                case n: BigDecimal => n.bigDecimal.longValue()
+                case _ => innerVal.toString().toLong
+              }
+              case _ => innerVal.toString().toLong
+            }
+          } getOrElse (edge.ts)
+        } catch {
+          case e: Exception =>
+            logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
+            edge.ts
+        }
+        val timeDiff = queryParam.timestamp - tsVal
+        timeDecay.decay(timeDiff)
+    }
+    tsVal
+  }
+  def processDuplicates[R](queryParam: QueryParam,
+                           duplicates: Seq[(FilterHashKey, R)])(implicit ev: WithScore[R]): Seq[(FilterHashKey, R)] = {
+    if (queryParam.label.consistencyLevel != "strong") {
+      //TODO:
+      queryParam.duplicatePolicy match {
+        case DuplicatePolicy.First => Seq(duplicates.head)
+        case DuplicatePolicy.Raw => duplicates
+        case DuplicatePolicy.CountSum =>
+          val countSum = duplicates.size
+          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, countSum))
+        case _ =>
+          val scoreSum = duplicates.foldLeft(0.0) { case (prev, current) => prev + ev.score(current._2) }
+          val (headFilterHashKey, headEdgeWithScore) = duplicates.head
+          Seq(headFilterHashKey -> ev.withNewScore(headEdgeWithScore, scoreSum))
+      }
+    } else {
+      duplicates
+    }
+  }
+  def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = {
+    val src = edge.srcVertex.innerId.hashCode()
+    val tgt = edge.tgtVertex.innerId.hashCode()
+    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+    val filterHashKey = (src, tgt)
+    (hashKey, filterHashKey)
+  }
+class TraversalHelper(graph: S2GraphLike) {
+  import TraversalHelper._
+  def buildNextStepQueryRequests(orgQuery: Query, stepIdx: Int, stepInnerResult: StepResult) = {
+    val edgeWithScoreLs = stepInnerResult.edgeWithScores
+    val q = orgQuery
+    val queryOption = orgQuery.queryOption
+    val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
+    val prevStepThreshold =
+    val prevStepLimit =
+    val step = q.steps(stepIdx)
+    val alreadyVisited =
+      if (stepIdx == 0) Map.empty[(LabelWithDirection, S2VertexLike), Boolean]
+      else alreadyVisitedVertices(stepInnerResult.edgeWithScores)
+    val initial = (Map.empty[S2VertexLike, Double], Map.empty[S2VertexLike, ArrayBuffer[EdgeWithScore]])
+    val (sums, grouped) = edgeWithScoreLs.foldLeft(initial) { case ((sum, group), edgeWithScore) =>
+      val key = edgeWithScore.edge.tgtVertex
+      val newScore = sum.getOrElse(key, 0.0) + edgeWithScore.score
+      val buffer = group.getOrElse(key, ArrayBuffer.empty[EdgeWithScore])
+      buffer += edgeWithScore
+      (sum + (key -> newScore), group + (key -> buffer))
+    }
+    val groupedByFiltered = sums.filter(t => t._2 >= prevStepThreshold)
+    val prevStepTgtVertexIdEdges = => -> t._2)
+    val nextStepSrcVertices = if (prevStepLimit >= 0) {
+      groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
+    } else {
+      groupedByFiltered.toSeq
+    }
+    val queryRequests = for {
+      (vertex, prevStepScore) <- nextStepSrcVertices
+      queryParam <- step.queryParams
+    } yield {
+      val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
+      val newPrevStepScore = if (queryOption.shouldPropagateScore) prevStepScore else 1.0
+      QueryRequest(q, stepIdx, vertex, queryParam, newPrevStepScore, labelWeight)
+    }
+    (alreadyVisited, prevStepTgtVertexIdEdges, queryRequests)
+  }
+  def filterEdges(q: Query,
+                  stepIdx: Int,
+                  queryRequests: Seq[QueryRequest],
+                  queryResultLsFuture: Future[Seq[StepResult]],
+                  queryParams: Seq[QueryParam],
+                  alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty,
+                  buildLastStepInnerResult: Boolean = true,
+                  parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+                 (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
+ { queryRequestWithResultLs =>
+      val (cursors, failCount) = {
+        val _cursors = ArrayBuffer.empty[Array[Byte]]
+        var _failCount = 0
+        queryRequestWithResultLs.foreach { stepResult =>
+          _cursors.append(stepResult.cursors: _*)
+          _failCount += stepResult.failCount
+        }
+        _cursors -> _failCount
+      }
+      if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
+      else {
+        val isLastStep = stepIdx == q.steps.size - 1
+        val queryOption = q.queryOption
+        val step = q.steps(stepIdx)
+        val currentStepResults =
+        val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
+        val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
+        if (shouldBuildInnerResults) {
+          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+            edgeWithScore
+          }
+          /* process step group by */
+          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+          StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+        } else {
+          val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
+            val edge = edgeWithScore.edge
+            val score = edgeWithScore.score
+            val label = edgeWithScore.label
+            /* Select */
+            val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
+            //            val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
+            val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+            val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
+            /* OrderBy */
+            val orderByValues =
+              if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None)
+              else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
+            /* StepGroupBy */
+            val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
+            /* GroupBy */
+            val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
+            /* FilterOut */
+            val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
+            newEdgeWithScore.copy(orderByValues = orderByValues,
+              stepGroupByValues = stepGroupByValues,
+              groupByValues = groupByValues,
+              filterOutValues = filterOutValues)
+          }
+          /* process step group by */
+          val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
+          /* process ordered list */
+          val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
+          /* process grouped list */
+          val grouped =
+            if (queryOption.groupBy.keys.isEmpty) Nil
+            else {
+              val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
+              results.groupBy { edgeWithScore =>
+                //      
+                edgeWithScore.groupByValues
+              }.foreach { case (k, ls) =>
+                val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
+                val newScoreSum = scoreSum
+                /*
+                  * watch out here. by calling toString on Any, we lose type information which will be used
+                  * later for toJson.
+                  */
+                if (merged.nonEmpty) {
+                  val newKey = merged.head.groupByValues
+                  agg += ((newKey, (newScoreSum, merged)))
+                }
+              }
+              agg.toSeq.sortBy(_._2._1 * -1)
+            }
+          StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
+        }
+      }
+    }
+  }
+  private def toEdgeWithScores(queryRequest: QueryRequest,
+                       stepResult: StepResult,
+                       parentEdges: Map[VertexId, Seq[EdgeWithScore]]): Seq[EdgeWithScore] = {
+    val queryOption = queryRequest.query.queryOption
+    val queryParam = queryRequest.queryParam
+    val prevScore = queryRequest.prevStepScore
+    val labelWeight = queryRequest.labelWeight
+    val edgeWithScores = stepResult.edgeWithScores
+    val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+    val parents = if (shouldBuildParents) {
+      parentEdges.getOrElse(, Nil).map { edgeWithScore =>
+        val edge = edgeWithScore.edge
+        val score = edgeWithScore.score
+        val label = edgeWithScore.label
+        /* Select */
+        val mergedPropsWithTs =
+          if (queryOption.selectColumns.isEmpty) {
+            edge.propertyValuesInner()
+          } else {
+            val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
+            edge.propertyValues(queryOption.selectColumns) ++ initial
+          }
+        val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
+        edgeWithScore.copy(edge = newEdge)
+      }
+    } else Nil
+    // skip
+    if (queryOption.ignorePrevStepCache) stepResult.edgeWithScores
+    else {
+      val degreeScore = 0.0
+      val sampled =
+        if (queryRequest.queryParam.sample >= 0) sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
+        else edgeWithScores
+      val withScores = for {
+        edgeWithScore <- sampled
+      } yield {
+        val edge = edgeWithScore.edge
+        val edgeScore = edgeWithScore.score
+        val score = queryParam.scorePropagateOp match {
+          case "plus" => edgeScore + prevScore
+          case "divide" =>
+            if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+            else edgeScore / (prevScore + queryParam.scorePropagateShrinkage)
+          case _ => edgeScore * prevScore
+        }
+        val tsVal = processTimeDecay(queryParam, edge)
+        val newScore = degreeScore + score
+        //          val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
+        val newEdge = edge.copyParentEdges(parents)
+        edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
+      }
+      val normalized =
+        if (queryParam.shouldNormalize) normalize(withScores)
+        else withScores
+      normalized
+    }
+  }
+  private def buildResult[R](query: Query,
+                     stepIdx: Int,
+                     stepResultLs: Seq[(QueryRequest, StepResult)],
+                     parentEdges: Map[VertexId, Seq[EdgeWithScore]])
+                    (createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
+                    (implicit ev: WithScore[R]): ListBuffer[R] = {
+    import scala.collection._
+    val results = ListBuffer.empty[R]
+    val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
+    val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
+    val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+    val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
+    var numOfDuplicates = 0
+    val queryOption = query.queryOption
+    val step = query.steps(stepIdx)
+    val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
+    val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
+    stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
+      val queryParam = queryRequest.queryParam
+      val label = queryParam.label
+      val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
+      val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
+      val propsSelectColumns = (for {
+        column <- queryOption.propsSelectColumns
+        labelMeta <- label.metaPropsInvMap.get(column)
+      } yield labelMeta)
+      for {
+        edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
+      } {
+        val edge = edgeWithScore.edge
+        val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
+        //        params += (hashKey -> queryParam) //
+        /* check if this edge should be exlcuded. */
+        if (shouldBeExcluded) {
+          edgesToExclude.add(filterHashKey)
+        } else {
+          if (shouldBeIncluded) {
+            edgesToInclude.add(filterHashKey)
+          }
+          val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
+          sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
+          duplicates.get(hashKey) match {
+            case None =>
+              val newLs = ListBuffer.empty[(FilterHashKey, R)]
+              newLs += (filterHashKey -> newEdgeWithScore)
+              duplicates += (hashKey -> newLs) //
+            case Some(old) =>
+              numOfDuplicates += 1
+              old += (filterHashKey -> newEdgeWithScore) //
+          }
+        }
+      }
+    }
+    if (numOfDuplicates == 0) {
+      // no duplicates at all.
+      for {
+        (hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
+        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+      } {
+        results += edgeWithScore
+      }
+    } else {
+      // need to resolve duplicates.
+      val seen = new mutable.HashSet[HashKey]()
+      for {
+        (hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
+        if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+        if !seen.contains(hashKey)
+      } {
+        //        val queryParam = params(hashKey)
+        processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
+          if (ev.score(duplicate) >= queryParam.threshold) {
+            seen += hashKey
+            results += duplicate
+          }
+        }
+      }
+    }
+    results
+  }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 3074b4e..28b15d0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -20,7 +20,7 @@
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.PostProcess._
+import org.apache.s2graph.core.TraversalHelper._
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.LabelMeta
 import org.apache.s2graph.core.parsers.WhereParser