You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/03/15 10:28:34 UTC

[18/27] incubator-s2graph git commit: [S2GRAPH-57]: Change package names into org.apahce.s2graph.

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
new file mode 100644
index 0000000..c1a5738
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -0,0 +1,567 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsNumber, Json}
+
+import scala.collection.JavaConversions._
+import scala.util.hashing.MurmurHash3
+
+
+case class SnapshotEdge(srcVertex: Vertex,
+                        tgtVertex: Vertex,
+                        labelWithDir: LabelWithDirection,
+                        op: Byte,
+                        version: Long,
+                        props: Map[Byte, InnerValLikeWithTs],
+                        pendingEdgeOpt: Option[Edge],
+                        statusCode: Byte = 0,
+                        lockTs: Option[Long]) extends JSONParser {
+
+  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
+
+  val label = Label.findById(labelWithDir.labelId)
+  val schemaVer = label.schemaVersion
+  lazy val propsWithoutTs = props.mapValues(_.innerVal)
+  val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong
+
+  def toEdge: Edge = {
+    val ts = props.get(LabelMeta.timeStampSeq).map(v => v.ts).getOrElse(version)
+    Edge(srcVertex, tgtVertex, labelWithDir, op,
+      version, props, pendingEdgeOpt = pendingEdgeOpt,
+      statusCode = statusCode, lockTs = lockTs)
+  }
+
+  def propsWithName = (for {
+    (seq, v) <- props
+    meta <- label.metaPropsMap.get(seq)
+    jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
+  } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
+
+  // only for debug
+  def toLogString() = {
+    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
+  }
+}
+
+case class IndexEdge(srcVertex: Vertex,
+                     tgtVertex: Vertex,
+                     labelWithDir: LabelWithDirection,
+                     op: Byte,
+                     version: Long,
+                     labelIndexSeq: Byte,
+                     props: Map[Byte, InnerValLike]) extends JSONParser {
+  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
+  //  assert(props.containsKey(LabelMeta.timeStampSeq))
+
+  val ts = props(LabelMeta.timeStampSeq).toString.toLong
+  val degreeEdge = props.contains(LabelMeta.degreeSeq)
+  lazy val label = Label.findById(labelWithDir.labelId)
+  val schemaVer = label.schemaVersion
+  lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
+  lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
+    val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
+    meta.seq -> innerVal
+  }.toMap
+
+  lazy val labelIndexMetaSeqs = labelIndex.metaSeqs
+
+  /** TODO: make sure call of this class fill props as this assumes */
+  lazy val orders = for (k <- labelIndexMetaSeqs) yield {
+    props.get(k) match {
+      case None =>
+
+        /**
+          * TODO: agly hack
+          * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
+          */
+        val v = k match {
+          case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer)
+          case LabelMeta.toSeq => tgtVertex.innerId
+          case LabelMeta.fromSeq => //srcVertex.innerId
+            // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data.
+            throw new RuntimeException("_from on indexProps is not supported")
+          case _ => defaultIndexMetas(k)
+        }
+
+        k -> v
+      case Some(v) => k -> v
+    }
+  }
+
+  lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
+  lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v
+
+  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
+
+  //TODO:
+  //  lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
+
+  lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
+
+  def propsWithName = for {
+    (seq, v) <- props
+    meta <- label.metaPropsMap.get(seq) if seq >= 0
+    jsValue <- innerValToJsValue(v, meta.dataType)
+  } yield meta.name -> jsValue
+
+
+  def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs)
+
+  // only for debug
+  def toLogString() = {
+    List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
+  }
+}
+
+case class Edge(srcVertex: Vertex,
+                tgtVertex: Vertex,
+                labelWithDir: LabelWithDirection,
+                op: Byte = GraphUtil.defaultOpByte,
+                version: Long = System.currentTimeMillis(),
+                propsWithTs: Map[Byte, InnerValLikeWithTs],
+                parentEdges: Seq[EdgeWithScore] = Nil,
+                originalEdgeOpt: Option[Edge] = None,
+                pendingEdgeOpt: Option[Edge] = None,
+                statusCode: Byte = 0,
+                lockTs: Option[Long] = None) extends GraphElement with JSONParser {
+
+  if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
+  //  assert(propsWithTs.containsKey(LabelMeta.timeStampSeq))
+  val schemaVer = label.schemaVersion
+  val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+  def props = propsWithTs.mapValues(_.innerVal)
+
+  def relatedEdges = {
+    if (labelWithDir.isDirected) List(this, duplicateEdge)
+    else {
+      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
+      val base = copy(labelWithDir = outDir)
+      List(base, base.reverseSrcTgtEdge)
+    }
+  }
+
+  //    def relatedEdges = List(this)
+
+  def srcForVertex = {
+    val belongLabelIds = Seq(labelWithDir.labelId)
+    if (labelWithDir.dir == GraphUtil.directions("in")) {
+      Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+    } else {
+      Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+    }
+  }
+
+  def tgtForVertex = {
+    val belongLabelIds = Seq(labelWithDir.labelId)
+    if (labelWithDir.dir == GraphUtil.directions("in")) {
+      Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+    } else {
+      Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+    }
+  }
+
+  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+
+  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
+
+  def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
+
+  def label = Label.findById(labelWithDir.labelId)
+
+  def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
+
+  override def serviceName = label.serviceName
+
+  override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
+
+  override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
+
+  override def isAsync = label.isAsync
+
+  def isDegree = propsWithTs.contains(LabelMeta.degreeSeq)
+
+  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+    case Some(_) => props
+    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+  }
+
+  def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0)
+
+  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
+    IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs)
+  }
+
+  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
+    IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTsValid)
+  }
+
+  /** force direction as out on invertedEdge */
+  def toSnapshotEdge: SnapshotEdge = {
+    val (smaller, larger) = (srcForVertex, tgtForVertex)
+
+    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
+
+    val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version,
+      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs,
+      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs)
+    ret
+  }
+
+  override def hashCode(): Int = {
+    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case e: Edge =>
+      srcVertex.innerId == e.srcVertex.innerId &&
+        tgtVertex.innerId == e.tgtVertex.innerId &&
+        labelWithDir == e.labelWithDir
+    case _ => false
+  }
+
+  def propsWithName = for {
+    (seq, v) <- props
+    meta <- label.metaPropsMap.get(seq) if seq > 0
+    jsValue <- innerValToJsValue(v, meta.dataType)
+  } yield meta.name -> jsValue
+
+  def updateTgtVertex(id: InnerValLike) = {
+    val newId = TargetVertexId(tgtVertex.id.colId, id)
+    val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
+    Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs)
+  }
+
+  def rank(r: RankParam): Double =
+    if (r.keySeqAndWeights.size <= 0) 1.0f
+    else {
+      var sum: Double = 0
+
+      for ((seq, w) <- r.keySeqAndWeights) {
+        seq match {
+          case LabelMeta.countSeq => sum += 1
+          case _ => {
+            propsWithTs.get(seq) match {
+              case None => // do nothing
+              case Some(innerValWithTs) => {
+                val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+                  case e: Exception =>
+                    logger.error("toInnerval failed in rank", e)
+                    1.0
+                }
+                sum += w * cost
+              }
+            }
+          }
+        }
+      }
+      sum
+    }
+
+  def toLogString: String = {
+    val ret =
+      if (propsWithName.nonEmpty)
+        List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName))
+      else
+        List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label)
+
+    ret.mkString("\t")
+  }
+}
+
+case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
+                      edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
+                      newSnapshotEdge: Option[SnapshotEdge] = None) {
+
+  def toLogString: String = {
+    val l = (0 until 50).map(_ => "-").mkString("")
+    val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}"
+    val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}"
+    val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}"
+
+    List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n")
+  }
+}
+
+object Edge extends JSONParser {
+  val incrementVersion = 1L
+  val minTsVal = 0L
+
+  /** now version information is required also **/
+  type State = Map[Byte, InnerValLikeWithTs]
+  type PropsPairWithTs = (State, State, Long, String)
+  type MergeState = PropsPairWithTs => (State, Boolean)
+  type UpdateFunc = (Option[Edge], Edge, MergeState)
+
+  def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean =
+    if (!props.containsKey(LabelMeta.lastDeletedAt)) false
+    else {
+      val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
+      val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
+
+      propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
+    }
+
+  def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = {
+    //    assert(invertedEdge.isEmpty)
+    //    assert(requestEdge.op == GraphUtil.operations("delete"))
+
+    val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+    val edgeInverted = Option(requestEdge.toSnapshotEdge)
+
+    (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted))
+  }
+
+  def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = {
+    //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
+    //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
+    val oldPropsWithTs =
+      if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else invertedEdge.get.propsWithTs
+
+    val funcs = requestEdges.map { edge =>
+      if (edge.op == GraphUtil.operations("insert")) {
+        edge.label.consistencyLevel match {
+          case "strong" => Edge.mergeUpsert _
+          case _ => Edge.mergeInsertBulk _
+        }
+      } else if (edge.op == GraphUtil.operations("insertBulk")) {
+        Edge.mergeInsertBulk _
+      } else if (edge.op == GraphUtil.operations("delete")) {
+        edge.label.consistencyLevel match {
+          case "strong" => Edge.mergeDelete _
+          case _ => throw new RuntimeException("not supported")
+        }
+      }
+      else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _
+      else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _
+      else throw new RuntimeException(s"not supported operation on edge: $edge")
+    }
+
+    val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
+    val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)
+
+    if (requestWithFuncs.isEmpty) {
+      (requestEdges.head, EdgeMutate())
+    } else {
+      val requestEdge = requestWithFuncs.last._1
+      var prevPropsWithTs = oldPropsWithTs
+
+      for {
+        (requestEdge, func) <- requestWithFuncs
+      } {
+        val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer)
+        prevPropsWithTs = _newPropsWithTs
+        //        logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
+      }
+      val requestTs = requestEdge.ts
+      /** version should be monotoniously increasing so our RPC mutation should be applied safely */
+      val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
+      val maxTs = prevPropsWithTs.map(_._2.ts).max
+      val newTs = if (maxTs > requestTs) maxTs else requestTs
+      val propsWithTs = prevPropsWithTs ++
+        Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs))
+      val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
+
+      //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
+      //      logger.error(s"$propsWithTs")
+      (requestEdge, edgeMutate)
+    }
+  }
+
+  def buildMutation(snapshotEdgeOpt: Option[Edge],
+                    requestEdge: Edge,
+                    newVersion: Long,
+                    oldPropsWithTs: Map[Byte, InnerValLikeWithTs],
+                    newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate = {
+    if (oldPropsWithTs == newPropsWithTs) {
+      // all requests should be dropped. so empty mutation.
+      //      logger.error(s"Case 1")
+      EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None)
+    } else {
+      val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAt)
+      val newOp = snapshotEdgeOpt match {
+        case None => requestEdge.op
+        case Some(old) =>
+          val oldMaxTs = old.propsWithTs.map(_._2.ts).max
+          if (oldMaxTs > requestEdge.ts) old.op
+          else requestEdge.op
+      }
+
+      val newSnapshotEdgeOpt =
+        Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge)
+      // delete request must always update snapshot.
+      if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) {
+        // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
+        //        logger.error(s"Case 2")
+        EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
+      } else {
+        //        logger.error(s"Case 3")
+        val edgesToDelete = snapshotEdgeOpt match {
+          case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
+            snapshotEdge.copy(op = GraphUtil.defaultOpByte).
+              relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+          case _ => Nil
+        }
+
+        val edgesToInsert =
+          if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
+          else
+            requestEdge.copy(version = newVersion, propsWithTs = newPropsWithTs, op = GraphUtil.defaultOpByte).
+              relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
+
+        EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt)
+      }
+    }
+  }
+
+  def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+    var shouldReplace = false
+    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
+    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+      propsWithTs.get(k) match {
+        case Some(newValWithTs) =>
+          assert(oldValWithTs.ts >= lastDeletedAt)
+          val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
+          else {
+            shouldReplace = true
+            newValWithTs
+          }
+          Some(k -> v)
+
+        case None =>
+          assert(oldValWithTs.ts >= lastDeletedAt)
+          if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs)
+          else {
+            shouldReplace = true
+            None
+          }
+      }
+    }
+    val existInNew =
+      for {
+        (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
+      } yield {
+        shouldReplace = true
+        Some(k -> newValWithTs)
+      }
+
+    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
+  }
+
+  def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+    var shouldReplace = false
+    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
+    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+      propsWithTs.get(k) match {
+        case Some(newValWithTs) =>
+          assert(oldValWithTs.ts >= lastDeletedAt)
+          val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
+          else {
+            shouldReplace = true
+            newValWithTs
+          }
+          Some(k -> v)
+        case None =>
+          // important: update need to merge previous valid values.
+          assert(oldValWithTs.ts >= lastDeletedAt)
+          Some(k -> oldValWithTs)
+      }
+    }
+    val existInNew = for {
+      (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
+    } yield {
+      shouldReplace = true
+      Some(k -> newValWithTs)
+    }
+
+    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
+  }
+
+  def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+    var shouldReplace = false
+    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
+    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+      propsWithTs.get(k) match {
+        case Some(newValWithTs) =>
+          if (k == LabelMeta.timeStampSeq) {
+            val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
+            else {
+              shouldReplace = true
+              newValWithTs
+            }
+            Some(k -> v)
+          } else {
+            if (oldValWithTs.ts >= newValWithTs.ts) {
+              Some(k -> oldValWithTs)
+            } else {
+              assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt)
+              shouldReplace = true
+              // incr(t0), incr(t2), d(t1) => deleted
+              Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
+            }
+          }
+
+        case None =>
+          assert(oldValWithTs.ts >= lastDeletedAt)
+          Some(k -> oldValWithTs)
+        //          if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
+      }
+    }
+    val existInNew = for {
+      (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
+    } yield {
+      shouldReplace = true
+      Some(k -> newValWithTs)
+    }
+
+    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
+  }
+
+  def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+    var shouldReplace = false
+    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
+    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
+      case Some(prevDeletedAt) =>
+        if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
+        else {
+          shouldReplace = true
+          requestTs
+        }
+      case None => {
+        shouldReplace = true
+        requestTs
+      }
+    }
+    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
+      if (k == LabelMeta.timeStampSeq) {
+        if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
+        else {
+          shouldReplace = true
+          Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version))
+        }
+      } else {
+        if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
+        else {
+          shouldReplace = true
+          None
+        }
+      }
+    }
+    val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version))
+    ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
+  }
+
+  def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
+    val (_, propsWithTs, _, _) = propsPairWithTs
+    (propsWithTs, true)
+  }
+
+  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
new file mode 100644
index 0000000..d3177b8
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -0,0 +1,159 @@
+package org.apache.s2graph.core
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicLong
+
+import akka.actor._
+import akka.routing.{Broadcast, RoundRobinPool}
+import com.typesafe.config.Config
+import org.apache.kafka.clients.producer._
+
+import scala.concurrent.duration._
+
+object ExceptionHandler {
+
+  var producer: Option[Producer[Key, Val]] = None
+  var properties: Option[Properties] = None
+  val numOfRoutees = 1
+  val actorSystem = ActorSystem("ExceptionHandler")
+  var routees: Option[ActorRef] = None
+  var shutdownTime = 1000 millis
+  var phase = "dev"
+  lazy val failTopic = s"mutateFailed_${phase}"
+
+  def apply(config: Config) = {
+    properties =
+      if (config.hasPath("kafka.metadata.broker.list")) Option(kafkaConfig(config))
+      else None
+    phase = if (config.hasPath("phase")) config.getString("phase") else "dev"
+    producer = for {
+      props <- properties
+      p <- try {
+        Option(new KafkaProducer[Key, Val](props))
+      } catch {
+        case e: Throwable => None
+      }
+    } yield {
+        p
+      }
+    init()
+  }
+
+  def props(producer: Producer[Key, Val]) = Props(classOf[KafkaAggregatorActor], producer)
+
+  def init() = {
+    for {
+      p <- producer
+    } {
+      routees = Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p))))
+    }
+  }
+
+  def shutdown() = {
+    routees.map(_ ! Broadcast(PoisonPill))
+    Thread.sleep(shutdownTime.length)
+  }
+
+  def enqueues(msgs: Seq[KafkaMessage]) = {
+    msgs.foreach(enqueue)
+  }
+
+  def enqueue(msg: KafkaMessage) = {
+    routees.map(_ ! msg)
+  }
+
+
+  def kafkaConfig(config: Config) = {
+    val props = new Properties();
+
+    /** all default configuration for new producer */
+    val brokers =
+      if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
+      else "localhost"
+    props.put("bootstrap.servers", brokers)
+    props.put("acks", "1")
+    props.put("buffer.memory", "33554432")
+    props.put("compression.type", "snappy")
+    props.put("retries", "0")
+    props.put("batch.size", "16384")
+    props.put("linger.ms", "0")
+    props.put("max.request.size", "1048576")
+    props.put("receive.buffer.bytes", "32768")
+    props.put("send.buffer.bytes", "131072")
+    props.put("timeout.ms", "30000")
+    props.put("block.on.buffer.full", "false")
+    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+    props
+  }
+
+  type Key = String
+  type Val = String
+
+  def toKafkaMessage(topic: String = failTopic, element: GraphElement, originalString: Option[String] = None) = {
+    KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey,
+      originalString.getOrElse(element.toLogString())))
+  }
+
+  case class KafkaMessage(msg: ProducerRecord[Key, Val])
+
+  case class Message(topic: String, msg: String)
+
+  case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], bufferSize: Int)
+
+  case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: Int)
+
+  case object FlushBuffer
+
+  case class UpdateHealth(isHealty: Boolean)
+
+  case object ShowMetrics
+
+}
+
+class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends Stash with ActorLogging {
+
+  import ExceptionHandler._
+
+  val failedCount = new AtomicLong(0L)
+  val successCount = new AtomicLong(0L)
+  val stashCount = new AtomicLong(0L)
+
+  implicit val ex = context.system.dispatcher
+
+  context.system.scheduler.schedule(0 millis, 10 seconds) {
+    self ! ShowMetrics
+  }
+
+  override def receive = {
+    case ShowMetrics =>
+      log.info(s"[Stats]: failed[${failedCount.get}], stashed[${stashCount.get}], success[${successCount.get}]")
+
+    case m: KafkaMessage =>
+      val replayTo = self
+      try {
+        kafkaProducer.send(m.msg, new Callback() {
+          override def onCompletion(meta: RecordMetadata, e: Exception) = {
+            if (e == null) {
+              // success
+              successCount.incrementAndGet()
+              unstashAll()
+              stashCount.set(0L)
+            } else {
+              // failure
+              log.error(s"onCompletion: $e", e)
+              failedCount.incrementAndGet()
+              replayTo ! m
+            }
+          }
+        })
+      } catch {
+        case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException | _: Throwable) =>
+          log.error(s"$e", e)
+          log.info(s"stash")
+          stash()
+          stashCount.incrementAndGet()
+      }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
new file mode 100644
index 0000000..b2cbd19
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -0,0 +1,379 @@
+package org.apache.s2graph.core
+
+import java.util
+import java.util.concurrent.ConcurrentHashMap
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.mysqls.{Label, Model}
+import org.apache.s2graph.core.parsers.WhereParser
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
+import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
+import org.apache.s2graph.core.utils.logger
+
+import scala.collection.JavaConversions._
+import scala.collection._
+import scala.collection.mutable.ListBuffer
+import scala.concurrent._
+import scala.util.Try
+
+object Graph {
+  val DefaultScore = 1.0
+
+  private val DefaultConfigs: Map[String, AnyRef] = Map(
+    "hbase.zookeeper.quorum" -> "localhost",
+    "hbase.table.name" -> "s2graph",
+    "hbase.table.compression.algorithm" -> "gz",
+    "phase" -> "dev",
+    "db.default.driver" -> "com.mysql.jdbc.Driver",
+    "db.default.url" -> "jdbc:mysql://localhost:3306/graph_dev",
+    "db.default.password" -> "graph",
+    "db.default.user" -> "graph",
+    "cache.max.size" -> java.lang.Integer.valueOf(10000),
+    "cache.ttl.seconds" -> java.lang.Integer.valueOf(60),
+    "hbase.client.retries.number" -> java.lang.Integer.valueOf(20),
+    "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort),
+    "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000),
+    "max.retry.number" -> java.lang.Integer.valueOf(100),
+    "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10),
+    "max.back.off" -> java.lang.Integer.valueOf(100),
+    "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1),
+    "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000),
+    "future.cache.max.size" -> java.lang.Integer.valueOf(100000),
+    "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
+    "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+    "s2graph.storage.backend" -> "hbase"
+  )
+
+  var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
+
+  /** helpers for filterEdges */
+  type HashKey = (Int, Int, Int, Int, Boolean)
+  type FilterHashKey = (Int, Int)
+  type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
+    ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
+    ListBuffer[(HashKey, FilterHashKey, Edge, Double)])
+
+  def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+    val src = edge.srcVertex.innerId.hashCode()
+    val tgt = edge.tgtVertex.innerId.hashCode()
+    val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
+    val filterHashKey = (src, tgt)
+
+    (hashKey, filterHashKey)
+  }
+
+  def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = {
+    val vertices = for {
+      queryResult <- queryResultLs
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      edge = edgeWithScore.edge
+      vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex
+    } yield (edge.labelWithDir, vertex) -> true
+
+    vertices.toMap
+  }
+
+  /** common methods for filter out, transform, aggregate queryResult */
+  def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
+    for {
+      convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree
+    } yield convertedEdge
+  }
+
+  def processTimeDecay(queryParam: QueryParam, edge: Edge) = {
+    /** process time decay */
+    val tsVal = queryParam.timeDecay match {
+      case None => 1.0
+      case Some(timeDecay) =>
+        val tsVal = try {
+          val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq)
+          val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq)
+          innerValWithTsOpt.map { innerValWithTs =>
+            val innerVal = innerValWithTs.innerVal
+            labelMeta.dataType match {
+              case InnerVal.LONG => innerVal.value match {
+                case n: BigDecimal => n.bigDecimal.longValue()
+                case _ => innerVal.toString().toLong
+              }
+              case _ => innerVal.toString().toLong
+            }
+          } getOrElse(edge.ts)
+        } catch {
+          case e: Exception =>
+            logger.error(s"processTimeDecay error. ${edge.toLogString}", e)
+            edge.ts
+        }
+        val timeDiff = queryParam.timestamp - tsVal
+        timeDecay.decay(timeDiff)
+    }
+
+    tsVal
+  }
+
+  def aggregateScore(newScore: Double,
+                     resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)],
+                     duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]],
+                     edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)],
+                     hashKey: HashKey,
+                     filterHashKey: FilterHashKey,
+                     queryParam: QueryParam,
+                     convertedEdge: Edge) = {
+
+    /** skip duplicate policy check if consistencyLevel is strong */
+    if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) {
+      val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey)
+      //TODO:
+      queryParam.duplicatePolicy match {
+        case Query.DuplicatePolicy.First => // do nothing
+        case Query.DuplicatePolicy.Raw =>
+          if (duplicateEdges.containsKey(hashKey)) {
+            duplicateEdges.get(hashKey).append(convertedEdge -> newScore)
+          } else {
+            val newBuffer = new ListBuffer[(Edge, Double)]
+            newBuffer.append(convertedEdge -> newScore)
+            duplicateEdges.put(hashKey, newBuffer)
+          }
+        case Query.DuplicatePolicy.CountSum =>
+          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1))
+        case _ =>
+          resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore))
+      }
+    } else {
+      resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore))
+      edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore))
+    }
+  }
+
+  def aggregateResults(queryRequestWithResult: QueryRequestWithResult,
+                       queryParamResult: Result,
+                       edgesToInclude: util.HashSet[FilterHashKey],
+                       edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = {
+    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+    val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get
+
+    val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult
+    val edgesWithScores = for {
+      (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
+      score = resultEdges.get(hashKey)._3
+      (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold
+    } yield EdgeWithScore(duplicateEdge, aggregatedScore)
+
+    QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores))
+  }
+
+  def fetchDuplicatedEdges(edge: Edge,
+                           score: Double,
+                           hashKey: HashKey,
+                           duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = {
+    (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty)
+  }
+
+  def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = {
+    val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+    val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get
+    val whereFilter = queryParam.where.get
+    if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs
+    else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge))
+  }
+
+  def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]],
+                  alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean])
+                 (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = {
+
+    queryResultLsFuture.map { queryRequestWithResultLs =>
+      if (queryRequestWithResultLs.isEmpty) Nil
+      else {
+        val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
+        val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get
+        val step = q.steps(stepIdx)
+
+        val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None
+
+        val excludeLabelWithDirSet = new util.HashSet[(Int, Int)]
+        val includeLabelWithDirSet = new util.HashSet[(Int, Int)]
+        step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
+        step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir))
+
+        val edgesToExclude = new util.HashSet[FilterHashKey]()
+        val edgesToInclude = new util.HashSet[FilterHashKey]()
+
+        val queryParamResultLs = new ListBuffer[Result]
+        queryRequestWithResultLs.foreach { queryRequestWithResult =>
+          val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get
+          val queryParam = queryRequest.queryParam
+          val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]()
+          val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]()
+          val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)]
+          val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0)
+
+          // store degree value with Array.empty so if degree edge exist, it comes at very first.
+          def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore =>
+            edgeWithScore.edge.isDegree
+          }
+          var isDegree = checkDegree()
+
+          val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir
+          val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey)
+          val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey)
+
+          queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore =>
+            val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+            if (queryParam.transformer.isDefault) {
+              val convertedEdge = edge
+
+              val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+
+              /** check if this edge should be exlcuded. */
+              if (shouldBeExcluded && !isDegree) {
+                edgesToExclude.add(filterHashKey)
+              } else {
+                if (shouldBeIncluded && !isDegree) {
+                  edgesToInclude.add(filterHashKey)
+                }
+                val tsVal = processTimeDecay(queryParam, convertedEdge)
+                val newScore = labelWeight * score * tsVal
+                aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+              }
+            } else {
+              convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge =>
+                val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree)
+
+                /** check if this edge should be exlcuded. */
+                if (shouldBeExcluded && !isDegree) {
+                  edgesToExclude.add(filterHashKey)
+                } else {
+                  if (shouldBeIncluded && !isDegree) {
+                    edgesToInclude.add(filterHashKey)
+                  }
+                  val tsVal = processTimeDecay(queryParam, convertedEdge)
+                  val newScore = labelWeight * score * tsVal
+                  aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge)
+                }
+              }
+            }
+            isDegree = false
+          }
+          val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted)
+          queryParamResultLs.append(ret)
+        }
+
+        val aggregatedResults = for {
+          (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs)
+        } yield {
+            aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude)
+          }
+
+        aggregatedResults
+      }
+    }
+  }
+
+  def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
+    val parts = GraphUtil.split(s)
+    val logType = parts(2)
+    val element = if (logType == "edge" | logType == "e") {
+      /** current only edge is considered to be bulk loaded */
+      labelMapping.get(parts(5)) match {
+        case None =>
+        case Some(toReplace) =>
+          parts(5) = toReplace
+      }
+      toEdge(parts)
+    } else if (logType == "vertex" | logType == "v") {
+      toVertex(parts)
+    } else {
+      throw new GraphExceptions.JsonParseException("log type is not exist in log.")
+    }
+
+    element
+  } recover {
+    case e: Exception =>
+      logger.error(s"$e", e)
+      None
+  } get
+
+
+  def toVertex(s: String): Option[Vertex] = {
+    toVertex(GraphUtil.split(s))
+  }
+
+  def toEdge(s: String): Option[Edge] = {
+    toEdge(GraphUtil.split(s))
+  }
+
+  //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}"
+  //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431},
+  def toEdge(parts: Array[String]): Option[Edge] = Try {
+    val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) parts(6) else "{}"
+    val tempDirection = if (parts.length >= 8) parts(7) else "out"
+    val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
+
+    val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props)
+    //            logger.debug(s"toEdge: $edge")
+    Some(edge)
+  } recover {
+    case e: Exception =>
+      logger.error(s"toEdge: $e", e)
+      throw e
+  } get
+
+  //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}"
+  def toVertex(parts: Array[String]): Option[Vertex] = Try {
+    val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
+    val props = if (parts.length >= 7) parts(6) else "{}"
+    Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props))
+  } recover {
+    case e: Throwable =>
+      logger.error(s"toVertex: $e", e)
+      throw e
+  } get
+
+  def initStorage(config: Config)(ec: ExecutionContext) = {
+    config.getString("s2graph.storage.backend") match {
+      case "hbase" => new AsynchbaseStorage(config)(ec)
+      case _ => throw new RuntimeException("not supported storage.")
+    }
+  }
+}
+
+class Graph(_config: Config)(implicit val ec: ExecutionContext) {
+  val config = _config.withFallback(Graph.DefaultConfig)
+
+  Model.apply(config)
+  Model.loadCache()
+
+  // TODO: Make storage client by config param
+  val storage = Graph.initStorage(config)(ec)
+
+
+  for {
+    entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey)
+    (k, v) = (entry.getKey, entry.getValue)
+  } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
+
+  /** select */
+  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params)
+
+  def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q)
+
+  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices)
+
+  /** write */
+  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
+    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
+
+  def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] =
+    storage.mutateElements(elements, withWait)
+
+  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
+
+  def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait)
+
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait)
+
+  def shutdown(): Unit = {
+    storage.flush()
+    Model.shutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala
new file mode 100644
index 0000000..12bb941
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala
@@ -0,0 +1,10 @@
+package org.apache.s2graph.core
+
+trait GraphElement {
+  def serviceName: String
+  def ts: Long
+  def isAsync: Boolean
+  def queueKey: String
+  def queuePartitionKey: String
+  def toLogString(): String
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
new file mode 100644
index 0000000..e7d2d76
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala
@@ -0,0 +1,28 @@
+package org.apache.s2graph.core
+
+object GraphExceptions {
+
+  case class JsonParseException(msg: String) extends Exception(msg)
+
+  case class LabelNotExistException(msg: String) extends Exception(msg)
+
+  case class ModelNotFoundException(msg: String) extends Exception(msg)
+
+  case class MaxPropSizeReachedException(msg: String) extends Exception(msg)
+
+  case class LabelAlreadyExistException(msg: String) extends Exception(msg)
+
+  case class InternalException(msg: String) extends Exception(msg)
+
+  case class IllegalDataTypeException(msg: String) extends Exception(msg)
+
+  case class WhereParserException(msg: String, ex: Exception = null) extends Exception(msg, ex)
+
+  case class BadQueryException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+
+  case class InvalidHTableException(msg: String) extends Exception(msg)
+
+  case class FetchTimeoutException(msg: String) extends Exception(msg)
+
+  case class DropRequestException(msg: String) extends Exception(msg)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
new file mode 100644
index 0000000..1a2b916
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala
@@ -0,0 +1,139 @@
+package org.apache.s2graph.core
+
+import java.util.regex.Pattern
+
+import play.api.libs.json.Json
+
+import scala.util.hashing.MurmurHash3
+
+object GraphUtil {
+  private val TOKEN_DELIMITER = Pattern.compile("[\t]")
+  val operations = Map("i" -> 0, "insert" -> 0, "u" -> 1, "update" -> 1,
+    "increment" -> 2, "d" -> 3, "delete" -> 3,
+    "deleteAll" -> 4, "insertBulk" -> 5, "incrementCount" -> 6).map {
+    case (k, v) =>
+      k -> v.toByte
+  }
+  val BitsForMurMurHash = 16
+  val bytesForMurMurHash = 2
+  val defaultOpByte = operations("insert")
+  val directions = Map("out" -> 0, "in" -> 1, "undirected" -> 2, "u" -> 2, "directed" -> 0, "d" -> 0)
+  val consistencyLevel = Map("weak" -> 0, "strong" -> 1)
+
+  def toType(t: String) = {
+    t.trim().toLowerCase match {
+      case "e" | "edge" => "edge"
+      case "v" | "vertex" => "vertex"
+    }
+  }
+
+  def toDir(direction: String): Option[Byte] = {
+    val d = direction.trim().toLowerCase match {
+      case "directed" | "d" => Some(0)
+      case "undirected" | "u" => Some(2)
+      case "out" => Some(0)
+      case "in" => Some(1)
+      case _ => None
+    }
+    d.map(x => x.toByte)
+  }
+
+  def toDirection(direction: String): Int = {
+    direction.trim().toLowerCase match {
+      case "directed" | "d" => 0
+      case "undirected" | "u" => 2
+      case "out" => 0
+      case "in" => 1
+      case _ => 2
+    }
+  }
+
+  def fromDirection(direction: Int) = {
+    direction match {
+      case 0 => "out"
+      case 1 => "in"
+      case 2 => "undirected"
+    }
+  }
+
+  def toggleDir(dir: Int) = {
+    dir match {
+      case 0 => 1
+      case 1 => 0
+      case 2 => 2
+      case _ => throw new UnsupportedOperationException(s"toggleDirection: $dir")
+    }
+  }
+
+  def toOp(op: String): Option[Byte] = {
+    op.trim() match {
+      case "i" | "insert" => Some(0)
+      case "d" | "delete" => Some(3)
+      case "u" | "update" => Some(1)
+      case "increment" => Some(2)
+      case "deleteAll" => Some(4)
+      case "insertBulk" => Some(5)
+      case "incrementCount" => Option(6)
+      case _ => None
+    }
+  }
+
+  def fromOp(op: Byte): String = {
+    op match {
+      case 0 => "insert"
+      case 3 => "delete"
+      case 1 => "update"
+      case 2 => "increment"
+      case 4 => "deleteAll"
+      case 5 => "insertBulk"
+      case 6 => "incrementCount"
+      case _ =>
+        throw new UnsupportedOperationException(s"op : $op (only support 0(insert),1(delete),2(updaet),3(increment))")
+    }
+  }
+
+  //  def toggleOp(op: Byte): Byte = {
+  //    val ret = op match {
+  //      case 0 => 1
+  //      case 1 => 0
+  //      case x: Byte => x
+  //    }
+  //    ret.toByte
+  //  }
+  // 2^31 - 1
+
+  def transformHash(h: Int): Int = {
+    //    h / 2 + (Int.MaxValue / 2 - 1)
+    if (h < 0) -1 * (h + 1) else h
+  }
+  def murmur3Int(s: String): Int = {
+    val hash = MurmurHash3.stringHash(s)
+    transformHash(hash)
+  }
+  def murmur3(s: String): Short = {
+    val hash = MurmurHash3.stringHash(s)
+    val positiveHash = transformHash(hash) >> BitsForMurMurHash
+    positiveHash.toShort
+//    Random.nextInt(Short.MaxValue).toShort
+  }
+
+  def smartSplit(s: String, delemiter: String) = {
+    val trimed_string = s.trim()
+    if (trimed_string.equals("")) {
+      Seq[String]()
+    } else {
+      trimed_string.split(delemiter).toList
+    }
+  }
+
+  def split(line: String) = TOKEN_DELIMITER.split(line)
+
+  def parseString(s: String): List[String] = {
+    if (!s.startsWith("[")) {
+      s.split("\n").toList
+    } else {
+      Json.parse(s).asOpt[List[String]].getOrElse(List.empty[String])
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
new file mode 100644
index 0000000..8b9a228
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -0,0 +1,134 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json._
+
+
+trait JSONParser {
+
+  //TODO: check result notation on bigDecimal.
+  def innerValToJsValue(innerVal: InnerValLike, dataType: String): Option[JsValue] = {
+    try {
+      val dType = InnerVal.toInnerDataType(dataType)
+      val jsValue = dType match {
+        case InnerVal.STRING => JsString(innerVal.value.asInstanceOf[String])
+        case InnerVal.BOOLEAN => JsBoolean(innerVal.value.asInstanceOf[Boolean])
+        case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+          //        case t if InnerVal.NUMERICS.contains(t) =>
+          innerVal.value match {
+            case l: Long => JsNumber(l)
+            case i: Int => JsNumber(i)
+            case s: Short => JsNumber(s.toLong)
+            case b: Byte => JsNumber(b.toLong)
+            case f: Float => JsNumber(f.toDouble)
+            case d: Double =>
+              //              JsNumber(d)
+              dType match {
+                case InnerVal.BYTE => JsNumber(d.toInt)
+                case InnerVal.SHORT => JsNumber(d.toInt)
+                case InnerVal.INT => JsNumber(d.toInt)
+                case InnerVal.LONG => JsNumber(d.toLong)
+                case InnerVal.FLOAT => JsNumber(d.toDouble)
+                case InnerVal.DOUBLE => JsNumber(d.toDouble)
+                case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType")
+              }
+            case num: BigDecimal =>
+              //              JsNumber(num)
+              //              JsNumber(InnerVal.scaleNumber(num.asInstanceOf[BigDecimal], dType))
+              dType match {
+                case InnerVal.BYTE => JsNumber(num.toInt)
+                case InnerVal.SHORT => JsNumber(num.toInt)
+                case InnerVal.INT => JsNumber(num.toInt)
+                case InnerVal.LONG => JsNumber(num.toLong)
+                case InnerVal.FLOAT => JsNumber(num.toDouble)
+                case InnerVal.DOUBLE => JsNumber(num.toDouble)
+                case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType")
+              }
+            //              JsNumber(num.toLong)
+            case _ => throw new RuntimeException(s"$innerVal, Numeric Unknown => $dataType")
+          }
+        //          JsNumber(InnerVal.scaleNumber(innerVal.asInstanceOf[BigDecimal], dType))
+        case _ => throw new RuntimeException(s"$innerVal, Unknown => $dataType")
+      }
+      Some(jsValue)
+    } catch {
+      case e: Exception =>
+        logger.info(s"JSONParser.innerValToJsValue: $e")
+        None
+    }
+  }
+
+  //  def innerValToString(innerVal: InnerValLike, dataType: String): String = {
+  //    val dType = InnerVal.toInnerDataType(dataType)
+  //    InnerVal.toInnerDataType(dType) match {
+  //      case InnerVal.STRING => innerVal.toString
+  //      case InnerVal.BOOLEAN => innerVal.toString
+  //      //      case t if InnerVal.NUMERICS.contains(t)  =>
+  //      case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+  //        BigDecimal(innerVal.toString).bigDecimal.toPlainString
+  //      case _ => innerVal.toString
+  //      //        throw new RuntimeException("innerVal to jsValue failed.")
+  //    }
+  //  }
+
+  def toInnerVal(str: String, dataType: String, version: String): InnerValLike = {
+    //TODO:
+    //        logger.error(s"toInnerVal: $str, $dataType, $version")
+    val s =
+      if (str.startsWith("\"") && str.endsWith("\"")) str.substring(1, str.length - 1)
+      else str
+    val dType = InnerVal.toInnerDataType(dataType)
+
+    dType match {
+      case InnerVal.STRING => InnerVal.withStr(s, version)
+      //      case t if InnerVal.NUMERICS.contains(t) => InnerVal.withNumber(BigDecimal(s), version)
+      case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+        InnerVal.withNumber(BigDecimal(s), version)
+      case InnerVal.BOOLEAN => InnerVal.withBoolean(s.toBoolean, version)
+      case InnerVal.BLOB => InnerVal.withBlob(s.getBytes, version)
+      case _ =>
+        //        InnerVal.withStr("")
+        throw new RuntimeException(s"illegal datatype for string: dataType is $dataType for $s")
+    }
+  }
+
+  def jsValueToInnerVal(jsValue: JsValue, dataType: String, version: String): Option[InnerValLike] = {
+    val ret = try {
+      val dType = InnerVal.toInnerDataType(dataType.toLowerCase())
+      jsValue match {
+        case n: JsNumber =>
+          dType match {
+            case InnerVal.STRING => Some(InnerVal.withStr(jsValue.toString, version))
+            //            case t if InnerVal.NUMERICS.contains(t) =>
+            case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+              Some(InnerVal.withNumber(n.value, version))
+            case _ => None
+          }
+        case s: JsString =>
+          dType match {
+            case InnerVal.STRING => Some(InnerVal.withStr(s.value, version))
+            case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(s.as[String].toBoolean, version))
+            //            case t if InnerVal.NUMERICS.contains(t) =>
+            case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE =>
+              Some(InnerVal.withNumber(BigDecimal(s.value), version))
+            case _ => None
+          }
+        case b: JsBoolean =>
+          dType match {
+            case InnerVal.STRING => Some(InnerVal.withStr(b.toString, version))
+            case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(b.value, version))
+            case _ => None
+          }
+        case _ =>
+          None
+      }
+    } catch {
+      case e: Exception =>
+        logger.error(e.getMessage)
+        None
+    }
+
+    ret
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
new file mode 100644
index 0000000..b355757
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -0,0 +1,368 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.types.HBaseType._
+import org.apache.s2graph.core.types._
+import play.api.libs.json.Reads._
+import play.api.libs.json._
+
+import scala.util.Try
+
+/**
+ * This is designed to be bridge between rest to s2core.
+ * s2core never use this for finding models.
+ */
+object Management extends JSONParser {
+
+  object JsonModel {
+
+    case class Prop(name: String, defaultValue: String, datatType: String)
+
+    object Prop extends ((String, String, String) => Prop)
+
+    case class Index(name: String, propNames: Seq[String])
+
+  }
+
+  import HBaseType._
+
+  val DefaultCompressionAlgorithm = "gz"
+
+
+  def findService(serviceName: String) = {
+    Service.findByName(serviceName, useCache = false)
+  }
+
+  def deleteService(serviceName: String) = {
+    Service.findByName(serviceName).foreach { service =>
+      //      service.deleteAll()
+    }
+  }
+
+  def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try {
+    val targetLabel = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(s"Target label $labelName does not exist."))
+    if (targetLabel.hTableName == newHTableName) throw new InvalidHTableException(s"New HTable name is already in use for target label.")
+
+    Label.updateHTableName(targetLabel.label, newHTableName)
+  }
+
+
+
+  def createServiceColumn(serviceName: String,
+                          columnName: String,
+                          columnType: String,
+                          props: Seq[Prop],
+                          schemaVersion: String = DEFAULT_VERSION) = {
+
+    Model withTx { implicit session =>
+      val serviceOpt = Service.findByName(serviceName)
+      serviceOpt match {
+        case None => throw new RuntimeException(s"create service $serviceName has not been created.")
+        case Some(service) =>
+          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion)
+          for {
+            Prop(propName, defaultValue, dataType) <- props
+          } yield {
+            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType)
+          }
+      }
+    }
+  }
+
+  def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = {
+    Model withTx { implicit session =>
+      val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found"))
+      val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false)
+      val columnNames = serviceColumns.map { serviceColumn =>
+        ServiceColumn.delete(serviceColumn.id.get)
+        serviceColumn.columnName
+      }
+
+      columnNames.getOrElse(throw new RuntimeException("column not found"))
+    }
+  }
+
+  def findLabel(labelName: String): Option[Label] = {
+    Label.findByName(labelName, useCache = false)
+  }
+
+  def deleteLabel(labelName: String) = {
+    Model withTx { implicit session =>
+      Label.findByName(labelName, useCache = false).foreach { label =>
+        Label.deleteAll(label)
+      }
+      labelName
+    }
+  }
+
+  def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
+    Model withTx { implicit session =>
+      val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
+      val labelMetaMap = label.metaPropsInvMap
+
+      indices.foreach { index =>
+        val metaSeq = index.propNames.map { name => labelMetaMap(name).seq }
+        LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none")
+      }
+
+      label
+    }
+  }
+
+  def addProp(labelStr: String, prop: Prop) = {
+    Model withTx { implicit session =>
+      val labelOpt = Label.findByName(labelStr)
+      val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
+
+      LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType)
+    }
+  }
+
+  def addProps(labelStr: String, props: Seq[Prop]) = {
+    Model withTx { implicit session =>
+      val labelOpt = Label.findByName(labelStr)
+      val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
+
+      props.map {
+        case Prop(propName, defaultValue, dataType) =>
+          LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType)
+      }
+    }
+  }
+
+  def addVertexProp(serviceName: String,
+                    columnName: String,
+                    propsName: String,
+                    propsType: String,
+                    schemaVersion: String = DEFAULT_VERSION): ColumnMeta = {
+    val result = for {
+      service <- Service.findByName(serviceName, useCache = false)
+      serviceColumn <- ServiceColumn.find(service.id.get, columnName)
+    } yield {
+        ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType)
+      }
+    result.getOrElse({
+      throw new RuntimeException(s"add property on vertex failed")
+    })
+  }
+
+  def getServiceLable(label: String): Option[Label] = {
+    Label.findByName(label, useCache = true)
+  }
+
+  /**
+   *
+   */
+
+  def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = {
+    for {
+      labelId <- label.id
+      dir = GraphUtil.toDirection(direction)
+    } yield LabelWithDirection(labelId, dir)
+  }
+
+  def tryOption[A, R](key: A, f: A => Option[R]) = {
+    f(key) match {
+      case None => throw new GraphExceptions.InternalException(s"$key is not found in DB. create $key first.")
+      case Some(r) => r
+    }
+  }
+
+  def toEdge(ts: Long, operation: String, srcId: String, tgtId: String,
+             labelStr: String, direction: String = "", props: String): Edge = {
+
+    val label = tryOption(labelStr, getServiceLable)
+    val dir =
+      if (direction == "")
+//        GraphUtil.toDirection(label.direction)
+        GraphUtil.directions("out")
+      else
+        GraphUtil.toDirection(direction)
+
+    //    logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}")
+    //    logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}")
+
+    val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion)
+    val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion)
+
+    val srcColId = label.srcColumn.id.get
+    val tgtColId = label.tgtColumn.id.get
+    val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) {
+      (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()),
+        Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()))
+    } else {
+      (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()),
+        Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis()))
+    }
+
+    //    val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction)
+    val labelWithDir = LabelWithDirection(label.id.get, dir)
+    val op = tryOption(operation, GraphUtil.toOp)
+
+    val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
+    val parsedProps = toProps(label, jsObject.fields).toMap
+    val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++
+      Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts))
+
+    Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs)
+
+  }
+
+  def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = {
+    Service.findByName(serviceName) match {
+      case None => throw new RuntimeException(s"$serviceName does not exist. create service first.")
+      case Some(service) =>
+        ServiceColumn.find(service.id.get, columnName) match {
+          case None => throw new RuntimeException(s"$columnName is not exist. create service column first.")
+          case Some(col) =>
+            val idVal = toInnerVal(id, col.columnType, col.schemaVersion)
+            val op = tryOption(operation, GraphUtil.toOp)
+            val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj())
+            val parsedProps = toProps(col, jsObject).toMap
+            Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op)
+        }
+    }
+  }
+
+  def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = {
+
+    val props = for {
+      (k, v) <- js.fields
+      meta <- column.metasInvMap.get(k)
+    } yield {
+        val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse(
+          throw new RuntimeException(s"$k is not defined. create schema for vertex."))
+
+        (meta.seq.toInt, innerVal)
+      }
+    props
+
+  }
+
+  def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = {
+    val props = for {
+      (k, v) <- js
+      meta <- label.metaPropsInvMap.get(k)
+      innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion)
+    } yield (meta.seq, innerVal)
+
+    props
+  }
+
+
+  /**
+   * update label name.
+   */
+  def updateLabelName(oldLabelName: String, newLabelName: String) = {
+    for {
+      old <- Label.findByName(oldLabelName)
+    } {
+      Label.findByName(newLabelName) match {
+        case None =>
+          Label.updateName(oldLabelName, newLabelName)
+        case Some(_) =>
+        //          throw new RuntimeException(s"$newLabelName already exist")
+      }
+    }
+  }
+}
+
+class Management(graph: Graph) {
+  import Management._
+  val storage = graph.storage
+
+  def createTable(zkAddr: String,
+                  tableName: String,
+                  cfs: List[String],
+                  regionMultiplier: Int,
+                  ttl: Option[Int],
+                  compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
+    storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
+
+  /** HBase specific code */
+  def createService(serviceName: String,
+                    cluster: String, hTableName: String,
+                    preSplitSize: Int, hTableTTL: Option[Int],
+                    compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
+
+    Model withTx { implicit session =>
+      val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
+      /** create hbase table for service */
+      storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
+      service
+    }
+  }
+
+  /** HBase specific code */
+  def createLabel(label: String,
+                  srcServiceName: String,
+                  srcColumnName: String,
+                  srcColumnType: String,
+                  tgtServiceName: String,
+                  tgtColumnName: String,
+                  tgtColumnType: String,
+                  isDirected: Boolean = true,
+                  serviceName: String,
+                  indices: Seq[Index],
+                  props: Seq[Prop],
+                  consistencyLevel: String,
+                  hTableName: Option[String],
+                  hTableTTL: Option[Int],
+                  schemaVersion: String = DEFAULT_VERSION,
+                  isAsync: Boolean,
+                  compressionAlgorithm: String = "gz"): Try[Label] = {
+
+    val labelOpt = Label.findByName(label, useCache = false)
+
+    Model withTx { implicit session =>
+      labelOpt match {
+        case Some(l) =>
+          throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
+        case None =>
+          /** create all models */
+          val newLabel = Label.insertAll(label,
+            srcServiceName, srcColumnName, srcColumnType,
+            tgtServiceName, tgtColumnName, tgtColumnType,
+            isDirected, serviceName, indices, props, consistencyLevel,
+            hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+
+          /** create hbase table */
+          val service = newLabel.service
+          (hTableName, hTableTTL) match {
+            case (None, None) => // do nothing
+            case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
+            case (Some(hbaseTableName), None) =>
+              // create own hbase table with default ttl on service level.
+              storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
+            case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
+              // create own hbase table with own ttl.
+              storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
+          }
+          newLabel
+      }
+    }
+  }
+
+  /**
+   * label
+   */
+  /**
+   * copy label when if oldLabel exist and newLabel do not exist.
+   * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
+   */
+  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+    val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
+    if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+
+    val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
+    val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
+
+    createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
+      old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
+      old.isDirected, old.serviceName,
+      allIndices, allProps,
+      old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
new file mode 100644
index 0000000..c31aa79
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala
@@ -0,0 +1,146 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.types.InnerValLike
+import play.api.libs.json.{JsNumber, JsString, JsValue}
+
+object OrderingUtil {
+
+  implicit object JsValueOrdering extends Ordering[JsValue] {
+    override def compare(x: JsValue, y: JsValue): Int = {
+      (x, y) match {
+        case (JsNumber(xv), JsNumber(yv)) =>
+          Ordering.BigDecimal.compare(xv, yv)
+        case (JsString(xv), JsString(yv)) =>
+          Ordering.String.compare(xv, yv)
+        case _ => throw new Exception(s"unsupported type")
+      }
+    }
+  }
+
+  implicit object InnerValLikeOrdering extends Ordering[InnerValLike] {
+    override def compare(x: InnerValLike, y: InnerValLike): Int = {
+      x.compare(y)
+    }
+  }
+
+  implicit object MultiValueOrdering extends Ordering[Any] {
+    override def compare(x: Any, y: Any): Int = {
+      (x, y) match {
+        case (xv: Int, yv: Int) => implicitly[Ordering[Int]].compare(xv, yv)
+        case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv)
+        case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv)
+        case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv)
+        case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv)
+        case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv)
+      }
+    }
+  }
+
+  def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]): Ordering[(T, T, T, T)] = {
+    new Ordering[(T, T, T, T)] {
+      override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
+        val len = ascendingLs.length
+        val it = ascendingLs.iterator
+        if (len >= 1) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare1 = ord.compare(x._1, y._1)
+          if (compare1 != 0) return compare1
+        }
+
+        if (len >= 2) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare2 = ord.compare(x._2, y._2)
+          if (compare2 != 0) return compare2
+        }
+
+        if (len >= 3) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare3 = ord.compare(x._3, y._3)
+          if (compare3 != 0) return compare3
+        }
+
+        if (len >= 4) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare4 = ord.compare(x._4, y._4)
+          if (compare4 != 0) return compare4
+        }
+        0
+      }
+    }
+  }
+}
+
+class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] {
+  override def compare(x: Seq[T], y: Seq[T]): Int = {
+    val xe = x.iterator
+    val ye = y.iterator
+    val oe = ascendingLs.iterator
+
+    while (xe.hasNext && ye.hasNext) {
+      val ascending = if (oe.hasNext) oe.next() else defaultAscending
+      val (xev, yev) = ascending match {
+        case true => xe.next() -> ye.next()
+        case false => ye.next() -> xe.next()
+      }
+      val res = ord.compare(xev, yev)
+      if (res != 0) return res
+    }
+
+    Ordering.Boolean.compare(xe.hasNext, ye.hasNext)
+  }
+}
+
+//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]) extends Ordering[(T, T, T, T)] {
+//  override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
+//    val len = ascendingLs.length
+//    val it = ascendingLs.iterator
+//    if (len >= 1) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare1 = ord.compare(x._1, y._1)
+//      if (compare1 != 0) return compare1
+//    }
+//
+//    if (len >= 2) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare2 = ord.compare(x._2, y._2)
+//      if (compare2 != 0) return compare2
+//    }
+//
+//    if (len >= 3) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare3 = ord.compare(x._3, y._3)
+//      if (compare3 != 0) return compare3
+//    }
+//
+//    if (len >= 4) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare4 = ord.compare(x._4, y._4)
+//      if (compare4 != 0) return compare4
+//    }
+//    0
+//  }
+//}